| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- package topology
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/sequence"
- "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- )
- // TestRaceConditionStress simulates the original issue scenario:
- // High concurrent writes causing capacity misjudgment
- func TestRaceConditionStress(t *testing.T) {
- // Create a cluster similar to the issue description:
- // 3 volume servers, 200GB each, 5GB volume limit = 40 volumes max per server
- const (
- numServers = 3
- volumeLimitMB = 5000 // 5GB in MB
- storagePerServerGB = 200 // 200GB per server
- maxVolumesPerServer = storagePerServerGB * 1024 / volumeLimitMB // 200*1024/5000 = 40
- concurrentRequests = 50 // High concurrency like the issue
- )
- // Create test topology
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), uint64(volumeLimitMB)*1024*1024, 5, false)
- dc := NewDataCenter("dc1")
- topo.LinkChildNode(dc)
- rack := NewRack("rack1")
- dc.LinkChildNode(rack)
- // Create 3 volume servers with realistic capacity
- servers := make([]*DataNode, numServers)
- for i := 0; i < numServers; i++ {
- dn := NewDataNode(fmt.Sprintf("server%d", i+1))
- rack.LinkChildNode(dn)
- // Set up disk with capacity for 40 volumes
- disk := NewDisk(types.HardDriveType.String())
- disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = maxVolumesPerServer
- dn.LinkChildNode(disk)
- servers[i] = dn
- }
- vg := NewDefaultVolumeGrowth()
- rp, _ := super_block.NewReplicaPlacementFromString("000") // Single replica like the issue
- option := &VolumeGrowOption{
- Collection: "test-bucket-large", // Same collection name as issue
- ReplicaPlacement: rp,
- DiskType: types.HardDriveType,
- }
- // Track results
- var successfulAllocations int64
- var failedAllocations int64
- var totalVolumesCreated int64
- var wg sync.WaitGroup
- // Launch concurrent volume creation requests
- startTime := time.Now()
- for i := 0; i < concurrentRequests; i++ {
- wg.Add(1)
- go func(requestId int) {
- defer wg.Done()
- // This is the critical test: multiple threads trying to allocate simultaneously
- servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
- if err != nil {
- atomic.AddInt64(&failedAllocations, 1)
- t.Logf("Request %d failed: %v", requestId, err)
- return
- }
- // Simulate volume creation delay (like in real scenario)
- time.Sleep(time.Millisecond * 50)
- // Simulate successful volume creation
- for _, server := range servers {
- disk := server.children[NodeId(types.HardDriveType.String())].(*Disk)
- deltaDiskUsage := &DiskUsageCounts{
- volumeCount: 1,
- }
- disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
- atomic.AddInt64(&totalVolumesCreated, 1)
- }
- // Release reservations (simulates successful registration)
- reservation.releaseAllReservations()
- atomic.AddInt64(&successfulAllocations, 1)
- }(i)
- }
- wg.Wait()
- duration := time.Since(startTime)
- // Verify results
- t.Logf("Test completed in %v", duration)
- t.Logf("Successful allocations: %d", successfulAllocations)
- t.Logf("Failed allocations: %d", failedAllocations)
- t.Logf("Total volumes created: %d", totalVolumesCreated)
- // Check capacity limits are respected
- totalCapacityUsed := int64(0)
- for i, server := range servers {
- disk := server.children[NodeId(types.HardDriveType.String())].(*Disk)
- volumeCount := disk.diskUsages.getOrCreateDisk(types.HardDriveType).volumeCount
- totalCapacityUsed += volumeCount
- t.Logf("Server %d: %d volumes (max: %d)", i+1, volumeCount, maxVolumesPerServer)
- // Critical test: No server should exceed its capacity
- if volumeCount > maxVolumesPerServer {
- t.Errorf("RACE CONDITION DETECTED: Server %d exceeded capacity: %d > %d",
- i+1, volumeCount, maxVolumesPerServer)
- }
- }
- // Verify totals make sense
- if totalVolumesCreated != totalCapacityUsed {
- t.Errorf("Volume count mismatch: created=%d, actual=%d", totalVolumesCreated, totalCapacityUsed)
- }
- // The total should never exceed the cluster capacity (120 volumes for 3 servers × 40 each)
- maxClusterCapacity := int64(numServers * maxVolumesPerServer)
- if totalCapacityUsed > maxClusterCapacity {
- t.Errorf("RACE CONDITION DETECTED: Cluster capacity exceeded: %d > %d",
- totalCapacityUsed, maxClusterCapacity)
- }
- // With reservations, we should have controlled allocation
- // Total requests = successful + failed should equal concurrentRequests
- if successfulAllocations+failedAllocations != concurrentRequests {
- t.Errorf("Request count mismatch: success=%d + failed=%d != total=%d",
- successfulAllocations, failedAllocations, concurrentRequests)
- }
- t.Logf("✅ Race condition test passed: Capacity limits respected with %d concurrent requests",
- concurrentRequests)
- }
- // TestCapacityJudgmentAccuracy verifies that the capacity calculation is accurate
- // under various load conditions
- func TestCapacityJudgmentAccuracy(t *testing.T) {
- // Create a single server with known capacity
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 5*1024*1024*1024, 5, false)
- dc := NewDataCenter("dc1")
- topo.LinkChildNode(dc)
- rack := NewRack("rack1")
- dc.LinkChildNode(rack)
- dn := NewDataNode("server1")
- rack.LinkChildNode(dn)
- // Server with capacity for exactly 10 volumes
- disk := NewDisk(types.HardDriveType.String())
- diskUsage := disk.diskUsages.getOrCreateDisk(types.HardDriveType)
- diskUsage.maxVolumeCount = 10
- dn.LinkChildNode(disk)
- // Also set max volume count on the DataNode level (gets propagated up)
- dn.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 10
- vg := NewDefaultVolumeGrowth()
- rp, _ := super_block.NewReplicaPlacementFromString("000")
- option := &VolumeGrowOption{
- Collection: "test",
- ReplicaPlacement: rp,
- DiskType: types.HardDriveType,
- }
- // Test accurate capacity reporting at each step
- for i := 0; i < 10; i++ {
- // Check available space before reservation
- availableBefore := dn.AvailableSpaceFor(option)
- availableForReservation := dn.AvailableSpaceForReservation(option)
- expectedAvailable := int64(10 - i)
- if availableBefore != expectedAvailable {
- t.Errorf("Step %d: Expected %d available, got %d", i, expectedAvailable, availableBefore)
- }
- if availableForReservation != expectedAvailable {
- t.Errorf("Step %d: Expected %d available for reservation, got %d", i, expectedAvailable, availableForReservation)
- }
- // Try to reserve and allocate
- _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
- if err != nil {
- t.Fatalf("Step %d: Unexpected reservation failure: %v", i, err)
- }
- // Check that available space for reservation decreased
- availableAfterReservation := dn.AvailableSpaceForReservation(option)
- if availableAfterReservation != expectedAvailable-1 {
- t.Errorf("Step %d: Expected %d available after reservation, got %d",
- i, expectedAvailable-1, availableAfterReservation)
- }
- // Simulate successful volume creation by properly updating disk usage hierarchy
- disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
- // Create a volume usage delta to simulate volume creation
- deltaDiskUsage := &DiskUsageCounts{
- volumeCount: 1,
- }
- // Properly propagate the usage up the hierarchy
- disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
- // Debug: Check the volume count after update
- diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
- currentVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount)
- t.Logf("Step %d: Volume count after update: %d", i, currentVolumeCount)
- // Release reservation
- reservation.releaseAllReservations()
- // Verify final state
- availableAfter := dn.AvailableSpaceFor(option)
- expectedAfter := int64(10 - i - 1)
- if availableAfter != expectedAfter {
- t.Errorf("Step %d: Expected %d available after creation, got %d",
- i, expectedAfter, availableAfter)
- // More debugging
- diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
- maxVolumes := atomic.LoadInt64(&diskUsageOnNode.maxVolumeCount)
- remoteVolumes := atomic.LoadInt64(&diskUsageOnNode.remoteVolumeCount)
- actualVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount)
- t.Logf("Debug Step %d: max=%d, volume=%d, remote=%d", i, maxVolumes, actualVolumeCount, remoteVolumes)
- }
- }
- // At this point, no more reservations should succeed
- _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
- if err == nil {
- t.Error("Expected reservation to fail when at capacity")
- }
- t.Logf("✅ Capacity judgment accuracy test passed")
- }
- // TestReservationSystemPerformance measures the performance impact of reservations
- func TestReservationSystemPerformance(t *testing.T) {
- // Create topology
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
- dc := NewDataCenter("dc1")
- topo.LinkChildNode(dc)
- rack := NewRack("rack1")
- dc.LinkChildNode(rack)
- dn := NewDataNode("server1")
- rack.LinkChildNode(dn)
- disk := NewDisk(types.HardDriveType.String())
- disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 1000
- dn.LinkChildNode(disk)
- vg := NewDefaultVolumeGrowth()
- rp, _ := super_block.NewReplicaPlacementFromString("000")
- option := &VolumeGrowOption{
- Collection: "test",
- ReplicaPlacement: rp,
- DiskType: types.HardDriveType,
- }
- // Benchmark reservation operations
- const iterations = 1000
- startTime := time.Now()
- for i := 0; i < iterations; i++ {
- _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
- if err != nil {
- t.Fatalf("Iteration %d failed: %v", i, err)
- }
- reservation.releaseAllReservations()
- // Simulate volume creation
- diskUsage := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
- atomic.AddInt64(&diskUsage.volumeCount, 1)
- }
- duration := time.Since(startTime)
- avgDuration := duration / iterations
- t.Logf("Performance: %d reservations in %v (avg: %v per reservation)",
- iterations, duration, avgDuration)
- // Performance should be reasonable (less than 1ms per reservation on average)
- if avgDuration > time.Millisecond {
- t.Errorf("Reservation system performance concern: %v per reservation", avgDuration)
- } else {
- t.Logf("✅ Performance test passed: %v per reservation", avgDuration)
- }
- }
|