race_condition_stress_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package topology
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "testing"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/sequence"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  11. )
  12. // TestRaceConditionStress simulates the original issue scenario:
  13. // High concurrent writes causing capacity misjudgment
  14. func TestRaceConditionStress(t *testing.T) {
  15. // Create a cluster similar to the issue description:
  16. // 3 volume servers, 200GB each, 5GB volume limit = 40 volumes max per server
  17. const (
  18. numServers = 3
  19. volumeLimitMB = 5000 // 5GB in MB
  20. storagePerServerGB = 200 // 200GB per server
  21. maxVolumesPerServer = storagePerServerGB * 1024 / volumeLimitMB // 200*1024/5000 = 40
  22. concurrentRequests = 50 // High concurrency like the issue
  23. )
  24. // Create test topology
  25. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), uint64(volumeLimitMB)*1024*1024, 5, false)
  26. dc := NewDataCenter("dc1")
  27. topo.LinkChildNode(dc)
  28. rack := NewRack("rack1")
  29. dc.LinkChildNode(rack)
  30. // Create 3 volume servers with realistic capacity
  31. servers := make([]*DataNode, numServers)
  32. for i := 0; i < numServers; i++ {
  33. dn := NewDataNode(fmt.Sprintf("server%d", i+1))
  34. rack.LinkChildNode(dn)
  35. // Set up disk with capacity for 40 volumes
  36. disk := NewDisk(types.HardDriveType.String())
  37. disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = maxVolumesPerServer
  38. dn.LinkChildNode(disk)
  39. servers[i] = dn
  40. }
  41. vg := NewDefaultVolumeGrowth()
  42. rp, _ := super_block.NewReplicaPlacementFromString("000") // Single replica like the issue
  43. option := &VolumeGrowOption{
  44. Collection: "test-bucket-large", // Same collection name as issue
  45. ReplicaPlacement: rp,
  46. DiskType: types.HardDriveType,
  47. }
  48. // Track results
  49. var successfulAllocations int64
  50. var failedAllocations int64
  51. var totalVolumesCreated int64
  52. var wg sync.WaitGroup
  53. // Launch concurrent volume creation requests
  54. startTime := time.Now()
  55. for i := 0; i < concurrentRequests; i++ {
  56. wg.Add(1)
  57. go func(requestId int) {
  58. defer wg.Done()
  59. // This is the critical test: multiple threads trying to allocate simultaneously
  60. servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
  61. if err != nil {
  62. atomic.AddInt64(&failedAllocations, 1)
  63. t.Logf("Request %d failed: %v", requestId, err)
  64. return
  65. }
  66. // Simulate volume creation delay (like in real scenario)
  67. time.Sleep(time.Millisecond * 50)
  68. // Simulate successful volume creation
  69. for _, server := range servers {
  70. disk := server.children[NodeId(types.HardDriveType.String())].(*Disk)
  71. deltaDiskUsage := &DiskUsageCounts{
  72. volumeCount: 1,
  73. }
  74. disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
  75. atomic.AddInt64(&totalVolumesCreated, 1)
  76. }
  77. // Release reservations (simulates successful registration)
  78. reservation.releaseAllReservations()
  79. atomic.AddInt64(&successfulAllocations, 1)
  80. }(i)
  81. }
  82. wg.Wait()
  83. duration := time.Since(startTime)
  84. // Verify results
  85. t.Logf("Test completed in %v", duration)
  86. t.Logf("Successful allocations: %d", successfulAllocations)
  87. t.Logf("Failed allocations: %d", failedAllocations)
  88. t.Logf("Total volumes created: %d", totalVolumesCreated)
  89. // Check capacity limits are respected
  90. totalCapacityUsed := int64(0)
  91. for i, server := range servers {
  92. disk := server.children[NodeId(types.HardDriveType.String())].(*Disk)
  93. volumeCount := disk.diskUsages.getOrCreateDisk(types.HardDriveType).volumeCount
  94. totalCapacityUsed += volumeCount
  95. t.Logf("Server %d: %d volumes (max: %d)", i+1, volumeCount, maxVolumesPerServer)
  96. // Critical test: No server should exceed its capacity
  97. if volumeCount > maxVolumesPerServer {
  98. t.Errorf("RACE CONDITION DETECTED: Server %d exceeded capacity: %d > %d",
  99. i+1, volumeCount, maxVolumesPerServer)
  100. }
  101. }
  102. // Verify totals make sense
  103. if totalVolumesCreated != totalCapacityUsed {
  104. t.Errorf("Volume count mismatch: created=%d, actual=%d", totalVolumesCreated, totalCapacityUsed)
  105. }
  106. // The total should never exceed the cluster capacity (120 volumes for 3 servers × 40 each)
  107. maxClusterCapacity := int64(numServers * maxVolumesPerServer)
  108. if totalCapacityUsed > maxClusterCapacity {
  109. t.Errorf("RACE CONDITION DETECTED: Cluster capacity exceeded: %d > %d",
  110. totalCapacityUsed, maxClusterCapacity)
  111. }
  112. // With reservations, we should have controlled allocation
  113. // Total requests = successful + failed should equal concurrentRequests
  114. if successfulAllocations+failedAllocations != concurrentRequests {
  115. t.Errorf("Request count mismatch: success=%d + failed=%d != total=%d",
  116. successfulAllocations, failedAllocations, concurrentRequests)
  117. }
  118. t.Logf("✅ Race condition test passed: Capacity limits respected with %d concurrent requests",
  119. concurrentRequests)
  120. }
  121. // TestCapacityJudgmentAccuracy verifies that the capacity calculation is accurate
  122. // under various load conditions
  123. func TestCapacityJudgmentAccuracy(t *testing.T) {
  124. // Create a single server with known capacity
  125. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 5*1024*1024*1024, 5, false)
  126. dc := NewDataCenter("dc1")
  127. topo.LinkChildNode(dc)
  128. rack := NewRack("rack1")
  129. dc.LinkChildNode(rack)
  130. dn := NewDataNode("server1")
  131. rack.LinkChildNode(dn)
  132. // Server with capacity for exactly 10 volumes
  133. disk := NewDisk(types.HardDriveType.String())
  134. diskUsage := disk.diskUsages.getOrCreateDisk(types.HardDriveType)
  135. diskUsage.maxVolumeCount = 10
  136. dn.LinkChildNode(disk)
  137. // Also set max volume count on the DataNode level (gets propagated up)
  138. dn.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 10
  139. vg := NewDefaultVolumeGrowth()
  140. rp, _ := super_block.NewReplicaPlacementFromString("000")
  141. option := &VolumeGrowOption{
  142. Collection: "test",
  143. ReplicaPlacement: rp,
  144. DiskType: types.HardDriveType,
  145. }
  146. // Test accurate capacity reporting at each step
  147. for i := 0; i < 10; i++ {
  148. // Check available space before reservation
  149. availableBefore := dn.AvailableSpaceFor(option)
  150. availableForReservation := dn.AvailableSpaceForReservation(option)
  151. expectedAvailable := int64(10 - i)
  152. if availableBefore != expectedAvailable {
  153. t.Errorf("Step %d: Expected %d available, got %d", i, expectedAvailable, availableBefore)
  154. }
  155. if availableForReservation != expectedAvailable {
  156. t.Errorf("Step %d: Expected %d available for reservation, got %d", i, expectedAvailable, availableForReservation)
  157. }
  158. // Try to reserve and allocate
  159. _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
  160. if err != nil {
  161. t.Fatalf("Step %d: Unexpected reservation failure: %v", i, err)
  162. }
  163. // Check that available space for reservation decreased
  164. availableAfterReservation := dn.AvailableSpaceForReservation(option)
  165. if availableAfterReservation != expectedAvailable-1 {
  166. t.Errorf("Step %d: Expected %d available after reservation, got %d",
  167. i, expectedAvailable-1, availableAfterReservation)
  168. }
  169. // Simulate successful volume creation by properly updating disk usage hierarchy
  170. disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
  171. // Create a volume usage delta to simulate volume creation
  172. deltaDiskUsage := &DiskUsageCounts{
  173. volumeCount: 1,
  174. }
  175. // Properly propagate the usage up the hierarchy
  176. disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
  177. // Debug: Check the volume count after update
  178. diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
  179. currentVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount)
  180. t.Logf("Step %d: Volume count after update: %d", i, currentVolumeCount)
  181. // Release reservation
  182. reservation.releaseAllReservations()
  183. // Verify final state
  184. availableAfter := dn.AvailableSpaceFor(option)
  185. expectedAfter := int64(10 - i - 1)
  186. if availableAfter != expectedAfter {
  187. t.Errorf("Step %d: Expected %d available after creation, got %d",
  188. i, expectedAfter, availableAfter)
  189. // More debugging
  190. diskUsageOnNode := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
  191. maxVolumes := atomic.LoadInt64(&diskUsageOnNode.maxVolumeCount)
  192. remoteVolumes := atomic.LoadInt64(&diskUsageOnNode.remoteVolumeCount)
  193. actualVolumeCount := atomic.LoadInt64(&diskUsageOnNode.volumeCount)
  194. t.Logf("Debug Step %d: max=%d, volume=%d, remote=%d", i, maxVolumes, actualVolumeCount, remoteVolumes)
  195. }
  196. }
  197. // At this point, no more reservations should succeed
  198. _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
  199. if err == nil {
  200. t.Error("Expected reservation to fail when at capacity")
  201. }
  202. t.Logf("✅ Capacity judgment accuracy test passed")
  203. }
  204. // TestReservationSystemPerformance measures the performance impact of reservations
  205. func TestReservationSystemPerformance(t *testing.T) {
  206. // Create topology
  207. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  208. dc := NewDataCenter("dc1")
  209. topo.LinkChildNode(dc)
  210. rack := NewRack("rack1")
  211. dc.LinkChildNode(rack)
  212. dn := NewDataNode("server1")
  213. rack.LinkChildNode(dn)
  214. disk := NewDisk(types.HardDriveType.String())
  215. disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 1000
  216. dn.LinkChildNode(disk)
  217. vg := NewDefaultVolumeGrowth()
  218. rp, _ := super_block.NewReplicaPlacementFromString("000")
  219. option := &VolumeGrowOption{
  220. Collection: "test",
  221. ReplicaPlacement: rp,
  222. DiskType: types.HardDriveType,
  223. }
  224. // Benchmark reservation operations
  225. const iterations = 1000
  226. startTime := time.Now()
  227. for i := 0; i < iterations; i++ {
  228. _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
  229. if err != nil {
  230. t.Fatalf("Iteration %d failed: %v", i, err)
  231. }
  232. reservation.releaseAllReservations()
  233. // Simulate volume creation
  234. diskUsage := dn.diskUsages.getOrCreateDisk(types.HardDriveType)
  235. atomic.AddInt64(&diskUsage.volumeCount, 1)
  236. }
  237. duration := time.Since(startTime)
  238. avgDuration := duration / iterations
  239. t.Logf("Performance: %d reservations in %v (avg: %v per reservation)",
  240. iterations, duration, avgDuration)
  241. // Performance should be reasonable (less than 1ms per reservation on average)
  242. if avgDuration > time.Millisecond {
  243. t.Errorf("Reservation system performance concern: %v per reservation", avgDuration)
  244. } else {
  245. t.Logf("✅ Performance test passed: %v per reservation", avgDuration)
  246. }
  247. }