active_topology_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. package topology
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/stretchr/testify/require"
  9. )
  10. // Helper function to find a disk by ID for testing - reduces code duplication
  11. func findDiskByID(disks []*DiskInfo, diskID uint32) *DiskInfo {
  12. for _, disk := range disks {
  13. if disk.DiskID == diskID {
  14. return disk
  15. }
  16. }
  17. return nil
  18. }
  19. // TestActiveTopologyBasicOperations tests basic topology management
  20. func TestActiveTopologyBasicOperations(t *testing.T) {
  21. topology := NewActiveTopology(10)
  22. assert.NotNil(t, topology)
  23. assert.Equal(t, 10, topology.recentTaskWindowSeconds)
  24. // Test empty topology
  25. assert.Equal(t, 0, len(topology.nodes))
  26. assert.Equal(t, 0, len(topology.disks))
  27. assert.Equal(t, 0, len(topology.pendingTasks))
  28. }
  29. // TestActiveTopologyUpdate tests topology updates from master
  30. func TestActiveTopologyUpdate(t *testing.T) {
  31. topology := NewActiveTopology(10)
  32. // Create sample topology info
  33. topologyInfo := createSampleTopology()
  34. err := topology.UpdateTopology(topologyInfo)
  35. require.NoError(t, err)
  36. // Verify topology structure
  37. assert.Equal(t, 2, len(topology.nodes)) // 2 nodes
  38. assert.Equal(t, 4, len(topology.disks)) // 4 disks total (2 per node)
  39. // Verify node structure
  40. node1, exists := topology.nodes["10.0.0.1:8080"]
  41. require.True(t, exists)
  42. assert.Equal(t, "dc1", node1.dataCenter)
  43. assert.Equal(t, "rack1", node1.rack)
  44. assert.Equal(t, 2, len(node1.disks))
  45. // Verify disk structure
  46. disk1, exists := topology.disks["10.0.0.1:8080:0"]
  47. require.True(t, exists)
  48. assert.Equal(t, uint32(0), disk1.DiskID)
  49. assert.Equal(t, "hdd", disk1.DiskType)
  50. assert.Equal(t, "dc1", disk1.DataCenter)
  51. }
  52. // TestTaskLifecycle tests the complete task lifecycle
  53. func TestTaskLifecycle(t *testing.T) {
  54. topology := NewActiveTopology(10)
  55. topology.UpdateTopology(createSampleTopology())
  56. taskID := "balance-001"
  57. // 1. Add pending task
  58. err := topology.AddPendingTask(TaskSpec{
  59. TaskID: taskID,
  60. TaskType: TaskTypeBalance,
  61. VolumeID: 1001,
  62. VolumeSize: 1024 * 1024 * 1024,
  63. Sources: []TaskSourceSpec{
  64. {ServerID: "10.0.0.1:8080", DiskID: 0},
  65. },
  66. Destinations: []TaskDestinationSpec{
  67. {ServerID: "10.0.0.2:8080", DiskID: 1},
  68. },
  69. })
  70. assert.NoError(t, err, "Should add pending task successfully")
  71. // Verify pending state
  72. assert.Equal(t, 1, len(topology.pendingTasks))
  73. assert.Equal(t, 0, len(topology.assignedTasks))
  74. assert.Equal(t, 0, len(topology.recentTasks))
  75. task := topology.pendingTasks[taskID]
  76. assert.Equal(t, TaskStatusPending, task.Status)
  77. assert.Equal(t, uint32(1001), task.VolumeID)
  78. // Verify task assigned to disks
  79. sourceDisk := topology.disks["10.0.0.1:8080:0"]
  80. targetDisk := topology.disks["10.0.0.2:8080:1"]
  81. assert.Equal(t, 1, len(sourceDisk.pendingTasks))
  82. assert.Equal(t, 1, len(targetDisk.pendingTasks))
  83. // 2. Assign task
  84. err = topology.AssignTask(taskID)
  85. require.NoError(t, err)
  86. // Verify assigned state
  87. assert.Equal(t, 0, len(topology.pendingTasks))
  88. assert.Equal(t, 1, len(topology.assignedTasks))
  89. assert.Equal(t, 0, len(topology.recentTasks))
  90. task = topology.assignedTasks[taskID]
  91. assert.Equal(t, TaskStatusInProgress, task.Status)
  92. // Verify task moved to assigned on disks
  93. assert.Equal(t, 0, len(sourceDisk.pendingTasks))
  94. assert.Equal(t, 1, len(sourceDisk.assignedTasks))
  95. assert.Equal(t, 0, len(targetDisk.pendingTasks))
  96. assert.Equal(t, 1, len(targetDisk.assignedTasks))
  97. // 3. Complete task
  98. err = topology.CompleteTask(taskID)
  99. require.NoError(t, err)
  100. // Verify completed state
  101. assert.Equal(t, 0, len(topology.pendingTasks))
  102. assert.Equal(t, 0, len(topology.assignedTasks))
  103. assert.Equal(t, 1, len(topology.recentTasks))
  104. task = topology.recentTasks[taskID]
  105. assert.Equal(t, TaskStatusCompleted, task.Status)
  106. assert.False(t, task.CompletedAt.IsZero())
  107. }
  108. // TestTaskDetectionScenarios tests various task detection scenarios
  109. func TestTaskDetectionScenarios(t *testing.T) {
  110. tests := []struct {
  111. name string
  112. scenario func() *ActiveTopology
  113. expectedTasks map[string]bool // taskType -> shouldDetect
  114. }{
  115. {
  116. name: "Empty cluster - no tasks needed",
  117. scenario: func() *ActiveTopology {
  118. topology := NewActiveTopology(10)
  119. topology.UpdateTopology(createEmptyTopology())
  120. return topology
  121. },
  122. expectedTasks: map[string]bool{
  123. "balance": false,
  124. "vacuum": false,
  125. "ec": false,
  126. },
  127. },
  128. {
  129. name: "Unbalanced cluster - balance task needed",
  130. scenario: func() *ActiveTopology {
  131. topology := NewActiveTopology(10)
  132. topology.UpdateTopology(createUnbalancedTopology())
  133. return topology
  134. },
  135. expectedTasks: map[string]bool{
  136. "balance": true,
  137. "vacuum": false,
  138. "ec": false,
  139. },
  140. },
  141. {
  142. name: "High garbage ratio - vacuum task needed",
  143. scenario: func() *ActiveTopology {
  144. topology := NewActiveTopology(10)
  145. topology.UpdateTopology(createHighGarbageTopology())
  146. return topology
  147. },
  148. expectedTasks: map[string]bool{
  149. "balance": false,
  150. "vacuum": true,
  151. "ec": false,
  152. },
  153. },
  154. {
  155. name: "Large volumes - EC task needed",
  156. scenario: func() *ActiveTopology {
  157. topology := NewActiveTopology(10)
  158. topology.UpdateTopology(createLargeVolumeTopology())
  159. return topology
  160. },
  161. expectedTasks: map[string]bool{
  162. "balance": false,
  163. "vacuum": false,
  164. "ec": true,
  165. },
  166. },
  167. {
  168. name: "Recent tasks - no immediate re-detection",
  169. scenario: func() *ActiveTopology {
  170. topology := NewActiveTopology(10)
  171. topology.UpdateTopology(createUnbalancedTopology())
  172. // Add recent balance task
  173. topology.recentTasks["recent-balance"] = &taskState{
  174. VolumeID: 1001,
  175. TaskType: TaskTypeBalance,
  176. Status: TaskStatusCompleted,
  177. CompletedAt: time.Now().Add(-5 * time.Second), // 5 seconds ago
  178. }
  179. return topology
  180. },
  181. expectedTasks: map[string]bool{
  182. "balance": false, // Should not detect due to recent task
  183. "vacuum": false,
  184. "ec": false,
  185. },
  186. },
  187. }
  188. for _, tt := range tests {
  189. t.Run(tt.name, func(t *testing.T) {
  190. topology := tt.scenario()
  191. // Test balance task detection
  192. shouldDetectBalance := tt.expectedTasks["balance"]
  193. actualDetectBalance := !topology.HasRecentTaskForVolume(1001, TaskTypeBalance)
  194. if shouldDetectBalance {
  195. assert.True(t, actualDetectBalance, "Should detect balance task")
  196. } else {
  197. // Note: In real implementation, task detection would be more sophisticated
  198. // This is a simplified test of the recent task prevention mechanism
  199. }
  200. // Test that recent tasks prevent re-detection
  201. if len(topology.recentTasks) > 0 {
  202. for _, task := range topology.recentTasks {
  203. hasRecent := topology.HasRecentTaskForVolume(task.VolumeID, task.TaskType)
  204. assert.True(t, hasRecent, "Should find recent task for volume %d", task.VolumeID)
  205. }
  206. }
  207. })
  208. }
  209. }
  210. // TestTargetSelectionScenarios tests target selection for different task types
  211. func TestTargetSelectionScenarios(t *testing.T) {
  212. tests := []struct {
  213. name string
  214. topology *ActiveTopology
  215. taskType TaskType
  216. excludeNode string
  217. expectedTargets int
  218. expectedBestTarget string
  219. }{
  220. {
  221. name: "Balance task - find least loaded disk",
  222. topology: createTopologyWithLoad(),
  223. taskType: TaskTypeBalance,
  224. excludeNode: "10.0.0.1:8080", // Exclude source node
  225. expectedTargets: 2, // 2 disks on other node
  226. },
  227. {
  228. name: "EC task - find multiple available disks",
  229. topology: createTopologyForEC(),
  230. taskType: TaskTypeErasureCoding,
  231. excludeNode: "", // Don't exclude any nodes
  232. expectedTargets: 4, // All 4 disks available
  233. },
  234. {
  235. name: "Vacuum task - avoid conflicting disks",
  236. topology: createTopologyWithConflicts(),
  237. taskType: TaskTypeVacuum,
  238. excludeNode: "",
  239. expectedTargets: 1, // Only 1 disk without conflicts (conflicts exclude more disks)
  240. },
  241. }
  242. for _, tt := range tests {
  243. t.Run(tt.name, func(t *testing.T) {
  244. availableDisks := tt.topology.GetAvailableDisks(tt.taskType, tt.excludeNode)
  245. assert.Equal(t, tt.expectedTargets, len(availableDisks),
  246. "Expected %d available disks, got %d", tt.expectedTargets, len(availableDisks))
  247. // Verify disks are actually available
  248. for _, disk := range availableDisks {
  249. assert.NotEqual(t, tt.excludeNode, disk.NodeID,
  250. "Available disk should not be on excluded node")
  251. assert.Less(t, disk.LoadCount, 2, "Disk load should be less than 2")
  252. }
  253. })
  254. }
  255. }
  256. // TestDiskLoadCalculation tests disk load calculation
  257. func TestDiskLoadCalculation(t *testing.T) {
  258. topology := NewActiveTopology(10)
  259. topology.UpdateTopology(createSampleTopology())
  260. // Initially no load
  261. disks := topology.GetNodeDisks("10.0.0.1:8080")
  262. targetDisk := findDiskByID(disks, 0)
  263. require.NotNil(t, targetDisk, "Should find disk with ID 0")
  264. assert.Equal(t, 0, targetDisk.LoadCount)
  265. // Add pending task
  266. err := topology.AddPendingTask(TaskSpec{
  267. TaskID: "task1",
  268. TaskType: TaskTypeBalance,
  269. VolumeID: 1001,
  270. VolumeSize: 1024 * 1024 * 1024,
  271. Sources: []TaskSourceSpec{
  272. {ServerID: "10.0.0.1:8080", DiskID: 0},
  273. },
  274. Destinations: []TaskDestinationSpec{
  275. {ServerID: "10.0.0.2:8080", DiskID: 1},
  276. },
  277. })
  278. assert.NoError(t, err, "Should add pending task successfully")
  279. // Check load increased
  280. disks = topology.GetNodeDisks("10.0.0.1:8080")
  281. targetDisk = findDiskByID(disks, 0)
  282. assert.Equal(t, 1, targetDisk.LoadCount)
  283. // Add another task to same disk
  284. err = topology.AddPendingTask(TaskSpec{
  285. TaskID: "task2",
  286. TaskType: TaskTypeVacuum,
  287. VolumeID: 1002,
  288. VolumeSize: 0,
  289. Sources: []TaskSourceSpec{
  290. {ServerID: "10.0.0.1:8080", DiskID: 0},
  291. },
  292. Destinations: []TaskDestinationSpec{
  293. {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
  294. },
  295. })
  296. assert.NoError(t, err, "Should add vacuum task successfully")
  297. disks = topology.GetNodeDisks("10.0.0.1:8080")
  298. targetDisk = findDiskByID(disks, 0)
  299. assert.Equal(t, 2, targetDisk.LoadCount)
  300. // Move one task to assigned
  301. topology.AssignTask("task1")
  302. // Load should still be 2 (1 pending + 1 assigned)
  303. disks = topology.GetNodeDisks("10.0.0.1:8080")
  304. targetDisk = findDiskByID(disks, 0)
  305. assert.Equal(t, 2, targetDisk.LoadCount)
  306. // Complete one task
  307. topology.CompleteTask("task1")
  308. // Load should decrease to 1
  309. disks = topology.GetNodeDisks("10.0.0.1:8080")
  310. targetDisk = findDiskByID(disks, 0)
  311. assert.Equal(t, 1, targetDisk.LoadCount)
  312. }
  313. // TestTaskConflictDetection tests task conflict detection
  314. func TestTaskConflictDetection(t *testing.T) {
  315. topology := NewActiveTopology(10)
  316. topology.UpdateTopology(createSampleTopology())
  317. // Add a balance task
  318. err := topology.AddPendingTask(TaskSpec{
  319. TaskID: "balance1",
  320. TaskType: TaskTypeBalance,
  321. VolumeID: 1001,
  322. VolumeSize: 1024 * 1024 * 1024,
  323. Sources: []TaskSourceSpec{
  324. {ServerID: "10.0.0.1:8080", DiskID: 0},
  325. },
  326. Destinations: []TaskDestinationSpec{
  327. {ServerID: "10.0.0.2:8080", DiskID: 1},
  328. },
  329. })
  330. assert.NoError(t, err, "Should add balance task successfully")
  331. topology.AssignTask("balance1")
  332. // Try to get available disks for vacuum (conflicts with balance)
  333. availableDisks := topology.GetAvailableDisks(TaskTypeVacuum, "")
  334. // Source disk should not be available due to conflict
  335. sourceDiskAvailable := false
  336. for _, disk := range availableDisks {
  337. if disk.NodeID == "10.0.0.1:8080" && disk.DiskID == 0 {
  338. sourceDiskAvailable = true
  339. break
  340. }
  341. }
  342. assert.False(t, sourceDiskAvailable, "Source disk should not be available due to task conflict")
  343. }
  344. // TestPublicInterfaces tests the public interface methods
  345. func TestPublicInterfaces(t *testing.T) {
  346. topology := NewActiveTopology(10)
  347. topology.UpdateTopology(createSampleTopology())
  348. // Test GetAllNodes
  349. nodes := topology.GetAllNodes()
  350. assert.Equal(t, 2, len(nodes))
  351. assert.Contains(t, nodes, "10.0.0.1:8080")
  352. assert.Contains(t, nodes, "10.0.0.2:8080")
  353. // Test GetNodeDisks
  354. disks := topology.GetNodeDisks("10.0.0.1:8080")
  355. assert.Equal(t, 2, len(disks))
  356. // Test with non-existent node
  357. disks = topology.GetNodeDisks("non-existent")
  358. assert.Nil(t, disks)
  359. }
  360. // Helper functions to create test topologies
  361. func createSampleTopology() *master_pb.TopologyInfo {
  362. return &master_pb.TopologyInfo{
  363. DataCenterInfos: []*master_pb.DataCenterInfo{
  364. {
  365. Id: "dc1",
  366. RackInfos: []*master_pb.RackInfo{
  367. {
  368. Id: "rack1",
  369. DataNodeInfos: []*master_pb.DataNodeInfo{
  370. {
  371. Id: "10.0.0.1:8080",
  372. DiskInfos: map[string]*master_pb.DiskInfo{
  373. "hdd": {DiskId: 0, VolumeCount: 10, MaxVolumeCount: 100},
  374. "ssd": {DiskId: 1, VolumeCount: 5, MaxVolumeCount: 50},
  375. },
  376. },
  377. {
  378. Id: "10.0.0.2:8080",
  379. DiskInfos: map[string]*master_pb.DiskInfo{
  380. "hdd": {DiskId: 0, VolumeCount: 8, MaxVolumeCount: 100},
  381. "ssd": {DiskId: 1, VolumeCount: 3, MaxVolumeCount: 50},
  382. },
  383. },
  384. },
  385. },
  386. },
  387. },
  388. },
  389. }
  390. }
  391. func createEmptyTopology() *master_pb.TopologyInfo {
  392. return &master_pb.TopologyInfo{
  393. DataCenterInfos: []*master_pb.DataCenterInfo{
  394. {
  395. Id: "dc1",
  396. RackInfos: []*master_pb.RackInfo{
  397. {
  398. Id: "rack1",
  399. DataNodeInfos: []*master_pb.DataNodeInfo{
  400. {
  401. Id: "10.0.0.1:8080",
  402. DiskInfos: map[string]*master_pb.DiskInfo{
  403. "hdd": {DiskId: 0, VolumeCount: 0, MaxVolumeCount: 100},
  404. },
  405. },
  406. },
  407. },
  408. },
  409. },
  410. },
  411. }
  412. }
  413. func createUnbalancedTopology() *master_pb.TopologyInfo {
  414. return &master_pb.TopologyInfo{
  415. DataCenterInfos: []*master_pb.DataCenterInfo{
  416. {
  417. Id: "dc1",
  418. RackInfos: []*master_pb.RackInfo{
  419. {
  420. Id: "rack1",
  421. DataNodeInfos: []*master_pb.DataNodeInfo{
  422. {
  423. Id: "10.0.0.1:8080",
  424. DiskInfos: map[string]*master_pb.DiskInfo{
  425. "hdd": {DiskId: 0, VolumeCount: 90, MaxVolumeCount: 100}, // Very loaded
  426. },
  427. },
  428. {
  429. Id: "10.0.0.2:8080",
  430. DiskInfos: map[string]*master_pb.DiskInfo{
  431. "hdd": {DiskId: 0, VolumeCount: 10, MaxVolumeCount: 100}, // Lightly loaded
  432. },
  433. },
  434. },
  435. },
  436. },
  437. },
  438. },
  439. }
  440. }
  441. func createHighGarbageTopology() *master_pb.TopologyInfo {
  442. // In a real implementation, this would include volume-level garbage metrics
  443. return createSampleTopology()
  444. }
  445. func createLargeVolumeTopology() *master_pb.TopologyInfo {
  446. // In a real implementation, this would include volume-level size metrics
  447. return createSampleTopology()
  448. }
  449. func createTopologyWithLoad() *ActiveTopology {
  450. topology := NewActiveTopology(10)
  451. topology.UpdateTopology(createSampleTopology())
  452. // Add some existing tasks to create load
  453. err := topology.AddPendingTask(TaskSpec{
  454. TaskID: "existing1",
  455. TaskType: TaskTypeVacuum,
  456. VolumeID: 2001,
  457. VolumeSize: 0,
  458. Sources: []TaskSourceSpec{
  459. {ServerID: "10.0.0.1:8080", DiskID: 0},
  460. },
  461. Destinations: []TaskDestinationSpec{
  462. {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
  463. },
  464. })
  465. if err != nil {
  466. // In test helper function, just log error instead of failing
  467. fmt.Printf("Warning: Failed to add existing task: %v\n", err)
  468. }
  469. topology.AssignTask("existing1")
  470. return topology
  471. }
  472. func createTopologyForEC() *ActiveTopology {
  473. topology := NewActiveTopology(10)
  474. topology.UpdateTopology(createSampleTopology())
  475. return topology
  476. }
  477. func createTopologyWithConflicts() *ActiveTopology {
  478. topology := NewActiveTopology(10)
  479. topology.UpdateTopology(createSampleTopology())
  480. // Add conflicting tasks
  481. err := topology.AddPendingTask(TaskSpec{
  482. TaskID: "balance1",
  483. TaskType: TaskTypeBalance,
  484. VolumeID: 3001,
  485. VolumeSize: 1024 * 1024 * 1024,
  486. Sources: []TaskSourceSpec{
  487. {ServerID: "10.0.0.1:8080", DiskID: 0},
  488. },
  489. Destinations: []TaskDestinationSpec{
  490. {ServerID: "10.0.0.2:8080", DiskID: 0},
  491. },
  492. })
  493. if err != nil {
  494. fmt.Printf("Warning: Failed to add balance task: %v\n", err)
  495. }
  496. topology.AssignTask("balance1")
  497. err = topology.AddPendingTask(TaskSpec{
  498. TaskID: "ec1",
  499. TaskType: TaskTypeErasureCoding,
  500. VolumeID: 3002,
  501. VolumeSize: 1024 * 1024 * 1024,
  502. Sources: []TaskSourceSpec{
  503. {ServerID: "10.0.0.1:8080", DiskID: 1},
  504. },
  505. Destinations: []TaskDestinationSpec{
  506. {ServerID: "", DiskID: 0}, // EC doesn't have single destination
  507. },
  508. })
  509. if err != nil {
  510. fmt.Printf("Warning: Failed to add EC task: %v\n", err)
  511. }
  512. topology.AssignTask("ec1")
  513. return topology
  514. }
  515. // TestDestinationPlanning tests that the public interface works correctly
  516. // NOTE: Destination planning is now done in task detection phase, not in ActiveTopology
  517. func TestDestinationPlanning(t *testing.T) {
  518. topology := NewActiveTopology(10)
  519. topology.UpdateTopology(createSampleTopology())
  520. // Test that GetAvailableDisks works for destination planning
  521. t.Run("GetAvailableDisks functionality", func(t *testing.T) {
  522. availableDisks := topology.GetAvailableDisks(TaskTypeBalance, "10.0.0.1:8080")
  523. assert.Greater(t, len(availableDisks), 0)
  524. // Should exclude the source node
  525. for _, disk := range availableDisks {
  526. assert.NotEqual(t, "10.0.0.1:8080", disk.NodeID)
  527. }
  528. })
  529. // Test that topology state can be used for planning
  530. t.Run("Topology provides planning information", func(t *testing.T) {
  531. topologyInfo := topology.GetTopologyInfo()
  532. assert.NotNil(t, topologyInfo)
  533. assert.Greater(t, len(topologyInfo.DataCenterInfos), 0)
  534. // Test getting node disks
  535. disks := topology.GetNodeDisks("10.0.0.1:8080")
  536. assert.Greater(t, len(disks), 0)
  537. })
  538. }