volume_growth_reservation_test.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package topology
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/sequence"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  11. )
  12. // MockGrpcDialOption simulates grpc connection for testing
  13. type MockGrpcDialOption struct{}
  14. // simulateVolumeAllocation mocks the volume allocation process
  15. func simulateVolumeAllocation(server *DataNode, vid needle.VolumeId, option *VolumeGrowOption) error {
  16. // Simulate some processing time
  17. time.Sleep(time.Millisecond * 10)
  18. return nil
  19. }
  20. func TestVolumeGrowth_ReservationBasedAllocation(t *testing.T) {
  21. // Create test topology with single server for predictable behavior
  22. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  23. // Create data center and rack
  24. dc := NewDataCenter("dc1")
  25. topo.LinkChildNode(dc)
  26. rack := NewRack("rack1")
  27. dc.LinkChildNode(rack)
  28. // Create single data node with limited capacity
  29. dn := NewDataNode("server1")
  30. rack.LinkChildNode(dn)
  31. // Set up disk with limited capacity (only 5 volumes)
  32. disk := NewDisk(types.HardDriveType.String())
  33. disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
  34. dn.LinkChildNode(disk)
  35. // Test volume growth with reservation
  36. vg := NewDefaultVolumeGrowth()
  37. rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas)
  38. option := &VolumeGrowOption{
  39. Collection: "test",
  40. ReplicaPlacement: rp,
  41. DiskType: types.HardDriveType,
  42. }
  43. // Try to create volumes and verify reservations work
  44. for i := 0; i < 5; i++ {
  45. servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
  46. if err != nil {
  47. t.Errorf("Failed to find slots with reservation on iteration %d: %v", i, err)
  48. continue
  49. }
  50. if len(servers) != 1 {
  51. t.Errorf("Expected 1 server for replica placement 000, got %d", len(servers))
  52. }
  53. if len(reservation.reservationIds) != 1 {
  54. t.Errorf("Expected 1 reservation ID, got %d", len(reservation.reservationIds))
  55. }
  56. // Verify the reservation is on our expected server
  57. server := servers[0]
  58. if server != dn {
  59. t.Errorf("Expected volume to be allocated on server1, got %s", server.Id())
  60. }
  61. // Check available space before and after reservation
  62. availableBeforeCreation := server.AvailableSpaceFor(option)
  63. expectedBefore := int64(5 - i)
  64. if availableBeforeCreation != expectedBefore {
  65. t.Errorf("Iteration %d: Expected %d base available space, got %d", i, expectedBefore, availableBeforeCreation)
  66. }
  67. // Simulate successful volume creation
  68. disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
  69. deltaDiskUsage := &DiskUsageCounts{
  70. volumeCount: 1,
  71. }
  72. disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
  73. // Release reservation after successful creation
  74. reservation.releaseAllReservations()
  75. // Verify available space after creation
  76. availableAfterCreation := server.AvailableSpaceFor(option)
  77. expectedAfter := int64(5 - i - 1)
  78. if availableAfterCreation != expectedAfter {
  79. t.Errorf("Iteration %d: Expected %d available space after creation, got %d", i, expectedAfter, availableAfterCreation)
  80. }
  81. }
  82. // After 5 volumes, should have no more capacity
  83. _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
  84. if err == nil {
  85. t.Error("Expected volume allocation to fail when server is at capacity")
  86. }
  87. }
  88. func TestVolumeGrowth_ConcurrentAllocationPreventsRaceCondition(t *testing.T) {
  89. // Create test topology with very limited capacity
  90. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  91. dc := NewDataCenter("dc1")
  92. topo.LinkChildNode(dc)
  93. rack := NewRack("rack1")
  94. dc.LinkChildNode(rack)
  95. // Single data node with capacity for only 5 volumes
  96. dn := NewDataNode("server1")
  97. rack.LinkChildNode(dn)
  98. disk := NewDisk(types.HardDriveType.String())
  99. disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
  100. dn.LinkChildNode(disk)
  101. vg := NewDefaultVolumeGrowth()
  102. rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas)
  103. option := &VolumeGrowOption{
  104. Collection: "test",
  105. ReplicaPlacement: rp,
  106. DiskType: types.HardDriveType,
  107. }
  108. // Simulate concurrent volume creation attempts
  109. const concurrentRequests = 10
  110. var wg sync.WaitGroup
  111. var successCount, failureCount atomic.Int32
  112. for i := 0; i < concurrentRequests; i++ {
  113. wg.Add(1)
  114. go func(requestId int) {
  115. defer wg.Done()
  116. _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
  117. if err != nil {
  118. failureCount.Add(1)
  119. t.Logf("Request %d failed as expected: %v", requestId, err)
  120. } else {
  121. successCount.Add(1)
  122. t.Logf("Request %d succeeded, got reservation", requestId)
  123. // Release the reservation to simulate completion
  124. if reservation != nil {
  125. reservation.releaseAllReservations()
  126. // Simulate volume creation by incrementing count
  127. disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
  128. deltaDiskUsage := &DiskUsageCounts{
  129. volumeCount: 1,
  130. }
  131. disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
  132. }
  133. }
  134. }(i)
  135. }
  136. wg.Wait()
  137. // With reservation system, only 5 requests should succeed (capacity limit)
  138. // The rest should fail due to insufficient capacity
  139. if successCount.Load() != 5 {
  140. t.Errorf("Expected exactly 5 successful reservations, got %d", successCount.Load())
  141. }
  142. if failureCount.Load() != 5 {
  143. t.Errorf("Expected exactly 5 failed reservations, got %d", failureCount.Load())
  144. }
  145. // Verify final state
  146. finalAvailable := dn.AvailableSpaceFor(option)
  147. if finalAvailable != 0 {
  148. t.Errorf("Expected 0 available space after all allocations, got %d", finalAvailable)
  149. }
  150. t.Logf("Concurrent test completed: %d successes, %d failures", successCount.Load(), failureCount.Load())
  151. }
  152. func TestVolumeGrowth_ReservationFailureRollback(t *testing.T) {
  153. // Create topology with multiple servers, but limited total capacity
  154. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  155. dc := NewDataCenter("dc1")
  156. topo.LinkChildNode(dc)
  157. rack := NewRack("rack1")
  158. dc.LinkChildNode(rack)
  159. // Create two servers with different available capacity
  160. dn1 := NewDataNode("server1")
  161. dn2 := NewDataNode("server2")
  162. rack.LinkChildNode(dn1)
  163. rack.LinkChildNode(dn2)
  164. // Server 1: 5 available slots
  165. disk1 := NewDisk(types.HardDriveType.String())
  166. disk1.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
  167. dn1.LinkChildNode(disk1)
  168. // Server 2: 0 available slots (full)
  169. disk2 := NewDisk(types.HardDriveType.String())
  170. diskUsage2 := disk2.diskUsages.getOrCreateDisk(types.HardDriveType)
  171. diskUsage2.maxVolumeCount = 5
  172. diskUsage2.volumeCount = 5
  173. dn2.LinkChildNode(disk2)
  174. vg := NewDefaultVolumeGrowth()
  175. rp, _ := super_block.NewReplicaPlacementFromString("010") // requires 2 replicas
  176. option := &VolumeGrowOption{
  177. Collection: "test",
  178. ReplicaPlacement: rp,
  179. DiskType: types.HardDriveType,
  180. }
  181. // This should fail because we can't satisfy replica requirements
  182. // (need 2 servers but only 1 has space)
  183. _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
  184. if err == nil {
  185. t.Error("Expected reservation to fail due to insufficient replica capacity")
  186. }
  187. // Verify no reservations are left hanging
  188. available1 := dn1.AvailableSpaceForReservation(option)
  189. if available1 != 5 {
  190. t.Errorf("Expected server1 to have all capacity available after failed reservation, got %d", available1)
  191. }
  192. available2 := dn2.AvailableSpaceForReservation(option)
  193. if available2 != 0 {
  194. t.Errorf("Expected server2 to have no capacity available, got %d", available2)
  195. }
  196. }
  197. func TestVolumeGrowth_ReservationTimeout(t *testing.T) {
  198. dn := NewDataNode("server1")
  199. diskType := types.HardDriveType
  200. // Set up capacity
  201. diskUsage := dn.diskUsages.getOrCreateDisk(diskType)
  202. diskUsage.maxVolumeCount = 5
  203. // Create a reservation
  204. reservationId, success := dn.TryReserveCapacity(diskType, 2)
  205. if !success {
  206. t.Fatal("Expected successful reservation")
  207. }
  208. // Manually set the reservation time to simulate old reservation
  209. dn.capacityReservations.Lock()
  210. if reservation, exists := dn.capacityReservations.reservations[reservationId]; exists {
  211. reservation.createdAt = time.Now().Add(-10 * time.Minute)
  212. }
  213. dn.capacityReservations.Unlock()
  214. // Try another reservation - this should trigger cleanup and succeed
  215. _, success = dn.TryReserveCapacity(diskType, 3)
  216. if !success {
  217. t.Error("Expected reservation to succeed after cleanup of expired reservation")
  218. }
  219. // Original reservation should be cleaned up
  220. option := &VolumeGrowOption{DiskType: diskType}
  221. available := dn.AvailableSpaceForReservation(option)
  222. if available != 2 { // 5 - 3 = 2
  223. t.Errorf("Expected 2 available slots after cleanup and new reservation, got %d", available)
  224. }
  225. }