| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607 |
- package topology
- import (
- "fmt"
- "testing"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- )
- // Helper function to find a disk by ID for testing - reduces code duplication
- func findDiskByID(disks []*DiskInfo, diskID uint32) *DiskInfo {
- for _, disk := range disks {
- if disk.DiskID == diskID {
- return disk
- }
- }
- return nil
- }
- // TestActiveTopologyBasicOperations tests basic topology management
- func TestActiveTopologyBasicOperations(t *testing.T) {
- topology := NewActiveTopology(10)
- assert.NotNil(t, topology)
- assert.Equal(t, 10, topology.recentTaskWindowSeconds)
- // Test empty topology
- assert.Equal(t, 0, len(topology.nodes))
- assert.Equal(t, 0, len(topology.disks))
- assert.Equal(t, 0, len(topology.pendingTasks))
- }
- // TestActiveTopologyUpdate tests topology updates from master
- func TestActiveTopologyUpdate(t *testing.T) {
- topology := NewActiveTopology(10)
- // Create sample topology info
- topologyInfo := createSampleTopology()
- err := topology.UpdateTopology(topologyInfo)
- require.NoError(t, err)
- // Verify topology structure
- assert.Equal(t, 2, len(topology.nodes)) // 2 nodes
- assert.Equal(t, 4, len(topology.disks)) // 4 disks total (2 per node)
- // Verify node structure
- node1, exists := topology.nodes["10.0.0.1:8080"]
- require.True(t, exists)
- assert.Equal(t, "dc1", node1.dataCenter)
- assert.Equal(t, "rack1", node1.rack)
- assert.Equal(t, 2, len(node1.disks))
- // Verify disk structure
- disk1, exists := topology.disks["10.0.0.1:8080:0"]
- require.True(t, exists)
- assert.Equal(t, uint32(0), disk1.DiskID)
- assert.Equal(t, "hdd", disk1.DiskType)
- assert.Equal(t, "dc1", disk1.DataCenter)
- }
- // TestTaskLifecycle tests the complete task lifecycle
- func TestTaskLifecycle(t *testing.T) {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createSampleTopology())
- taskID := "balance-001"
- // 1. Add pending task
- err := topology.AddPendingTask(TaskSpec{
- TaskID: taskID,
- TaskType: TaskTypeBalance,
- VolumeID: 1001,
- VolumeSize: 1024 * 1024 * 1024,
- Sources: []TaskSourceSpec{
- {ServerID: "10.0.0.1:8080", DiskID: 0},
- },
- Destinations: []TaskDestinationSpec{
- {ServerID: "10.0.0.2:8080", DiskID: 1},
- },
- })
- assert.NoError(t, err, "Should add pending task successfully")
- // Verify pending state
- assert.Equal(t, 1, len(topology.pendingTasks))
- assert.Equal(t, 0, len(topology.assignedTasks))
- assert.Equal(t, 0, len(topology.recentTasks))
- task := topology.pendingTasks[taskID]
- assert.Equal(t, TaskStatusPending, task.Status)
- assert.Equal(t, uint32(1001), task.VolumeID)
- // Verify task assigned to disks
- sourceDisk := topology.disks["10.0.0.1:8080:0"]
- targetDisk := topology.disks["10.0.0.2:8080:1"]
- assert.Equal(t, 1, len(sourceDisk.pendingTasks))
- assert.Equal(t, 1, len(targetDisk.pendingTasks))
- // 2. Assign task
- err = topology.AssignTask(taskID)
- require.NoError(t, err)
- // Verify assigned state
- assert.Equal(t, 0, len(topology.pendingTasks))
- assert.Equal(t, 1, len(topology.assignedTasks))
- assert.Equal(t, 0, len(topology.recentTasks))
- task = topology.assignedTasks[taskID]
- assert.Equal(t, TaskStatusInProgress, task.Status)
- // Verify task moved to assigned on disks
- assert.Equal(t, 0, len(sourceDisk.pendingTasks))
- assert.Equal(t, 1, len(sourceDisk.assignedTasks))
- assert.Equal(t, 0, len(targetDisk.pendingTasks))
- assert.Equal(t, 1, len(targetDisk.assignedTasks))
- // 3. Complete task
- err = topology.CompleteTask(taskID)
- require.NoError(t, err)
- // Verify completed state
- assert.Equal(t, 0, len(topology.pendingTasks))
- assert.Equal(t, 0, len(topology.assignedTasks))
- assert.Equal(t, 1, len(topology.recentTasks))
- task = topology.recentTasks[taskID]
- assert.Equal(t, TaskStatusCompleted, task.Status)
- assert.False(t, task.CompletedAt.IsZero())
- }
- // TestTaskDetectionScenarios tests various task detection scenarios
- func TestTaskDetectionScenarios(t *testing.T) {
- tests := []struct {
- name string
- scenario func() *ActiveTopology
- expectedTasks map[string]bool // taskType -> shouldDetect
- }{
- {
- name: "Empty cluster - no tasks needed",
- scenario: func() *ActiveTopology {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createEmptyTopology())
- return topology
- },
- expectedTasks: map[string]bool{
- "balance": false,
- "vacuum": false,
- "ec": false,
- },
- },
- {
- name: "Unbalanced cluster - balance task needed",
- scenario: func() *ActiveTopology {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createUnbalancedTopology())
- return topology
- },
- expectedTasks: map[string]bool{
- "balance": true,
- "vacuum": false,
- "ec": false,
- },
- },
- {
- name: "High garbage ratio - vacuum task needed",
- scenario: func() *ActiveTopology {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createHighGarbageTopology())
- return topology
- },
- expectedTasks: map[string]bool{
- "balance": false,
- "vacuum": true,
- "ec": false,
- },
- },
- {
- name: "Large volumes - EC task needed",
- scenario: func() *ActiveTopology {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createLargeVolumeTopology())
- return topology
- },
- expectedTasks: map[string]bool{
- "balance": false,
- "vacuum": false,
- "ec": true,
- },
- },
- {
- name: "Recent tasks - no immediate re-detection",
- scenario: func() *ActiveTopology {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createUnbalancedTopology())
- // Add recent balance task
- topology.recentTasks["recent-balance"] = &taskState{
- VolumeID: 1001,
- TaskType: TaskTypeBalance,
- Status: TaskStatusCompleted,
- CompletedAt: time.Now().Add(-5 * time.Second), // 5 seconds ago
- }
- return topology
- },
- expectedTasks: map[string]bool{
- "balance": false, // Should not detect due to recent task
- "vacuum": false,
- "ec": false,
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- topology := tt.scenario()
- // Test balance task detection
- shouldDetectBalance := tt.expectedTasks["balance"]
- actualDetectBalance := !topology.HasRecentTaskForVolume(1001, TaskTypeBalance)
- if shouldDetectBalance {
- assert.True(t, actualDetectBalance, "Should detect balance task")
- } else {
- // Note: In real implementation, task detection would be more sophisticated
- // This is a simplified test of the recent task prevention mechanism
- }
- // Test that recent tasks prevent re-detection
- if len(topology.recentTasks) > 0 {
- for _, task := range topology.recentTasks {
- hasRecent := topology.HasRecentTaskForVolume(task.VolumeID, task.TaskType)
- assert.True(t, hasRecent, "Should find recent task for volume %d", task.VolumeID)
- }
- }
- })
- }
- }
- // TestTargetSelectionScenarios tests target selection for different task types
- func TestTargetSelectionScenarios(t *testing.T) {
- tests := []struct {
- name string
- topology *ActiveTopology
- taskType TaskType
- excludeNode string
- expectedTargets int
- expectedBestTarget string
- }{
- {
- name: "Balance task - find least loaded disk",
- topology: createTopologyWithLoad(),
- taskType: TaskTypeBalance,
- excludeNode: "10.0.0.1:8080", // Exclude source node
- expectedTargets: 2, // 2 disks on other node
- },
- {
- name: "EC task - find multiple available disks",
- topology: createTopologyForEC(),
- taskType: TaskTypeErasureCoding,
- excludeNode: "", // Don't exclude any nodes
- expectedTargets: 4, // All 4 disks available
- },
- {
- name: "Vacuum task - avoid conflicting disks",
- topology: createTopologyWithConflicts(),
- taskType: TaskTypeVacuum,
- excludeNode: "",
- expectedTargets: 1, // Only 1 disk without conflicts (conflicts exclude more disks)
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- availableDisks := tt.topology.GetAvailableDisks(tt.taskType, tt.excludeNode)
- assert.Equal(t, tt.expectedTargets, len(availableDisks),
- "Expected %d available disks, got %d", tt.expectedTargets, len(availableDisks))
- // Verify disks are actually available
- for _, disk := range availableDisks {
- assert.NotEqual(t, tt.excludeNode, disk.NodeID,
- "Available disk should not be on excluded node")
- assert.Less(t, disk.LoadCount, 2, "Disk load should be less than 2")
- }
- })
- }
- }
- // TestDiskLoadCalculation tests disk load calculation
- func TestDiskLoadCalculation(t *testing.T) {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createSampleTopology())
- // Initially no load
- disks := topology.GetNodeDisks("10.0.0.1:8080")
- targetDisk := findDiskByID(disks, 0)
- require.NotNil(t, targetDisk, "Should find disk with ID 0")
- assert.Equal(t, 0, targetDisk.LoadCount)
- // Add pending task
- err := topology.AddPendingTask(TaskSpec{
- TaskID: "task1",
- TaskType: TaskTypeBalance,
- VolumeID: 1001,
- VolumeSize: 1024 * 1024 * 1024,
- Sources: []TaskSourceSpec{
- {ServerID: "10.0.0.1:8080", DiskID: 0},
- },
- Destinations: []TaskDestinationSpec{
- {ServerID: "10.0.0.2:8080", DiskID: 1},
- },
- })
- assert.NoError(t, err, "Should add pending task successfully")
- // Check load increased
- disks = topology.GetNodeDisks("10.0.0.1:8080")
- targetDisk = findDiskByID(disks, 0)
- assert.Equal(t, 1, targetDisk.LoadCount)
- // Add another task to same disk
- err = topology.AddPendingTask(TaskSpec{
- TaskID: "task2",
- TaskType: TaskTypeVacuum,
- VolumeID: 1002,
- VolumeSize: 0,
- Sources: []TaskSourceSpec{
- {ServerID: "10.0.0.1:8080", DiskID: 0},
- },
- Destinations: []TaskDestinationSpec{
- {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
- },
- })
- assert.NoError(t, err, "Should add vacuum task successfully")
- disks = topology.GetNodeDisks("10.0.0.1:8080")
- targetDisk = findDiskByID(disks, 0)
- assert.Equal(t, 2, targetDisk.LoadCount)
- // Move one task to assigned
- topology.AssignTask("task1")
- // Load should still be 2 (1 pending + 1 assigned)
- disks = topology.GetNodeDisks("10.0.0.1:8080")
- targetDisk = findDiskByID(disks, 0)
- assert.Equal(t, 2, targetDisk.LoadCount)
- // Complete one task
- topology.CompleteTask("task1")
- // Load should decrease to 1
- disks = topology.GetNodeDisks("10.0.0.1:8080")
- targetDisk = findDiskByID(disks, 0)
- assert.Equal(t, 1, targetDisk.LoadCount)
- }
- // TestTaskConflictDetection tests task conflict detection
- func TestTaskConflictDetection(t *testing.T) {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createSampleTopology())
- // Add a balance task
- err := topology.AddPendingTask(TaskSpec{
- TaskID: "balance1",
- TaskType: TaskTypeBalance,
- VolumeID: 1001,
- VolumeSize: 1024 * 1024 * 1024,
- Sources: []TaskSourceSpec{
- {ServerID: "10.0.0.1:8080", DiskID: 0},
- },
- Destinations: []TaskDestinationSpec{
- {ServerID: "10.0.0.2:8080", DiskID: 1},
- },
- })
- assert.NoError(t, err, "Should add balance task successfully")
- topology.AssignTask("balance1")
- // Try to get available disks for vacuum (conflicts with balance)
- availableDisks := topology.GetAvailableDisks(TaskTypeVacuum, "")
- // Source disk should not be available due to conflict
- sourceDiskAvailable := false
- for _, disk := range availableDisks {
- if disk.NodeID == "10.0.0.1:8080" && disk.DiskID == 0 {
- sourceDiskAvailable = true
- break
- }
- }
- assert.False(t, sourceDiskAvailable, "Source disk should not be available due to task conflict")
- }
- // TestPublicInterfaces tests the public interface methods
- func TestPublicInterfaces(t *testing.T) {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createSampleTopology())
- // Test GetAllNodes
- nodes := topology.GetAllNodes()
- assert.Equal(t, 2, len(nodes))
- assert.Contains(t, nodes, "10.0.0.1:8080")
- assert.Contains(t, nodes, "10.0.0.2:8080")
- // Test GetNodeDisks
- disks := topology.GetNodeDisks("10.0.0.1:8080")
- assert.Equal(t, 2, len(disks))
- // Test with non-existent node
- disks = topology.GetNodeDisks("non-existent")
- assert.Nil(t, disks)
- }
- // Helper functions to create test topologies
- func createSampleTopology() *master_pb.TopologyInfo {
- return &master_pb.TopologyInfo{
- DataCenterInfos: []*master_pb.DataCenterInfo{
- {
- Id: "dc1",
- RackInfos: []*master_pb.RackInfo{
- {
- Id: "rack1",
- DataNodeInfos: []*master_pb.DataNodeInfo{
- {
- Id: "10.0.0.1:8080",
- DiskInfos: map[string]*master_pb.DiskInfo{
- "hdd": {DiskId: 0, VolumeCount: 10, MaxVolumeCount: 100},
- "ssd": {DiskId: 1, VolumeCount: 5, MaxVolumeCount: 50},
- },
- },
- {
- Id: "10.0.0.2:8080",
- DiskInfos: map[string]*master_pb.DiskInfo{
- "hdd": {DiskId: 0, VolumeCount: 8, MaxVolumeCount: 100},
- "ssd": {DiskId: 1, VolumeCount: 3, MaxVolumeCount: 50},
- },
- },
- },
- },
- },
- },
- },
- }
- }
- func createEmptyTopology() *master_pb.TopologyInfo {
- return &master_pb.TopologyInfo{
- DataCenterInfos: []*master_pb.DataCenterInfo{
- {
- Id: "dc1",
- RackInfos: []*master_pb.RackInfo{
- {
- Id: "rack1",
- DataNodeInfos: []*master_pb.DataNodeInfo{
- {
- Id: "10.0.0.1:8080",
- DiskInfos: map[string]*master_pb.DiskInfo{
- "hdd": {DiskId: 0, VolumeCount: 0, MaxVolumeCount: 100},
- },
- },
- },
- },
- },
- },
- },
- }
- }
- func createUnbalancedTopology() *master_pb.TopologyInfo {
- return &master_pb.TopologyInfo{
- DataCenterInfos: []*master_pb.DataCenterInfo{
- {
- Id: "dc1",
- RackInfos: []*master_pb.RackInfo{
- {
- Id: "rack1",
- DataNodeInfos: []*master_pb.DataNodeInfo{
- {
- Id: "10.0.0.1:8080",
- DiskInfos: map[string]*master_pb.DiskInfo{
- "hdd": {DiskId: 0, VolumeCount: 90, MaxVolumeCount: 100}, // Very loaded
- },
- },
- {
- Id: "10.0.0.2:8080",
- DiskInfos: map[string]*master_pb.DiskInfo{
- "hdd": {DiskId: 0, VolumeCount: 10, MaxVolumeCount: 100}, // Lightly loaded
- },
- },
- },
- },
- },
- },
- },
- }
- }
- func createHighGarbageTopology() *master_pb.TopologyInfo {
- // In a real implementation, this would include volume-level garbage metrics
- return createSampleTopology()
- }
- func createLargeVolumeTopology() *master_pb.TopologyInfo {
- // In a real implementation, this would include volume-level size metrics
- return createSampleTopology()
- }
- func createTopologyWithLoad() *ActiveTopology {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createSampleTopology())
- // Add some existing tasks to create load
- err := topology.AddPendingTask(TaskSpec{
- TaskID: "existing1",
- TaskType: TaskTypeVacuum,
- VolumeID: 2001,
- VolumeSize: 0,
- Sources: []TaskSourceSpec{
- {ServerID: "10.0.0.1:8080", DiskID: 0},
- },
- Destinations: []TaskDestinationSpec{
- {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
- },
- })
- if err != nil {
- // In test helper function, just log error instead of failing
- fmt.Printf("Warning: Failed to add existing task: %v\n", err)
- }
- topology.AssignTask("existing1")
- return topology
- }
- func createTopologyForEC() *ActiveTopology {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createSampleTopology())
- return topology
- }
- func createTopologyWithConflicts() *ActiveTopology {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createSampleTopology())
- // Add conflicting tasks
- err := topology.AddPendingTask(TaskSpec{
- TaskID: "balance1",
- TaskType: TaskTypeBalance,
- VolumeID: 3001,
- VolumeSize: 1024 * 1024 * 1024,
- Sources: []TaskSourceSpec{
- {ServerID: "10.0.0.1:8080", DiskID: 0},
- },
- Destinations: []TaskDestinationSpec{
- {ServerID: "10.0.0.2:8080", DiskID: 0},
- },
- })
- if err != nil {
- fmt.Printf("Warning: Failed to add balance task: %v\n", err)
- }
- topology.AssignTask("balance1")
- err = topology.AddPendingTask(TaskSpec{
- TaskID: "ec1",
- TaskType: TaskTypeErasureCoding,
- VolumeID: 3002,
- VolumeSize: 1024 * 1024 * 1024,
- Sources: []TaskSourceSpec{
- {ServerID: "10.0.0.1:8080", DiskID: 1},
- },
- Destinations: []TaskDestinationSpec{
- {ServerID: "", DiskID: 0}, // EC doesn't have single destination
- },
- })
- if err != nil {
- fmt.Printf("Warning: Failed to add EC task: %v\n", err)
- }
- topology.AssignTask("ec1")
- return topology
- }
- // TestDestinationPlanning tests that the public interface works correctly
- // NOTE: Destination planning is now done in task detection phase, not in ActiveTopology
- func TestDestinationPlanning(t *testing.T) {
- topology := NewActiveTopology(10)
- topology.UpdateTopology(createSampleTopology())
- // Test that GetAvailableDisks works for destination planning
- t.Run("GetAvailableDisks functionality", func(t *testing.T) {
- availableDisks := topology.GetAvailableDisks(TaskTypeBalance, "10.0.0.1:8080")
- assert.Greater(t, len(availableDisks), 0)
- // Should exclude the source node
- for _, disk := range availableDisks {
- assert.NotEqual(t, "10.0.0.1:8080", disk.NodeID)
- }
- })
- // Test that topology state can be used for planning
- t.Run("Topology provides planning information", func(t *testing.T) {
- topologyInfo := topology.GetTopologyInfo()
- assert.NotNil(t, topologyInfo)
- assert.Greater(t, len(topologyInfo.DataCenterInfos), 0)
- // Test getting node disks
- disks := topology.GetNodeDisks("10.0.0.1:8080")
- assert.Greater(t, len(disks), 0)
- })
- }
|