| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- package topology
- import (
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/sequence"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- )
- // MockGrpcDialOption simulates grpc connection for testing
- type MockGrpcDialOption struct{}
- // simulateVolumeAllocation mocks the volume allocation process
- func simulateVolumeAllocation(server *DataNode, vid needle.VolumeId, option *VolumeGrowOption) error {
- // Simulate some processing time
- time.Sleep(time.Millisecond * 10)
- return nil
- }
- func TestVolumeGrowth_ReservationBasedAllocation(t *testing.T) {
- // Create test topology with single server for predictable behavior
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
- // Create data center and rack
- dc := NewDataCenter("dc1")
- topo.LinkChildNode(dc)
- rack := NewRack("rack1")
- dc.LinkChildNode(rack)
- // Create single data node with limited capacity
- dn := NewDataNode("server1")
- rack.LinkChildNode(dn)
- // Set up disk with limited capacity (only 5 volumes)
- disk := NewDisk(types.HardDriveType.String())
- disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
- dn.LinkChildNode(disk)
- // Test volume growth with reservation
- vg := NewDefaultVolumeGrowth()
- rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas)
- option := &VolumeGrowOption{
- Collection: "test",
- ReplicaPlacement: rp,
- DiskType: types.HardDriveType,
- }
- // Try to create volumes and verify reservations work
- for i := 0; i < 5; i++ {
- servers, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
- if err != nil {
- t.Errorf("Failed to find slots with reservation on iteration %d: %v", i, err)
- continue
- }
- if len(servers) != 1 {
- t.Errorf("Expected 1 server for replica placement 000, got %d", len(servers))
- }
- if len(reservation.reservationIds) != 1 {
- t.Errorf("Expected 1 reservation ID, got %d", len(reservation.reservationIds))
- }
- // Verify the reservation is on our expected server
- server := servers[0]
- if server != dn {
- t.Errorf("Expected volume to be allocated on server1, got %s", server.Id())
- }
- // Check available space before and after reservation
- availableBeforeCreation := server.AvailableSpaceFor(option)
- expectedBefore := int64(5 - i)
- if availableBeforeCreation != expectedBefore {
- t.Errorf("Iteration %d: Expected %d base available space, got %d", i, expectedBefore, availableBeforeCreation)
- }
- // Simulate successful volume creation
- disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
- deltaDiskUsage := &DiskUsageCounts{
- volumeCount: 1,
- }
- disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
- // Release reservation after successful creation
- reservation.releaseAllReservations()
- // Verify available space after creation
- availableAfterCreation := server.AvailableSpaceFor(option)
- expectedAfter := int64(5 - i - 1)
- if availableAfterCreation != expectedAfter {
- t.Errorf("Iteration %d: Expected %d available space after creation, got %d", i, expectedAfter, availableAfterCreation)
- }
- }
- // After 5 volumes, should have no more capacity
- _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
- if err == nil {
- t.Error("Expected volume allocation to fail when server is at capacity")
- }
- }
- func TestVolumeGrowth_ConcurrentAllocationPreventsRaceCondition(t *testing.T) {
- // Create test topology with very limited capacity
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
- dc := NewDataCenter("dc1")
- topo.LinkChildNode(dc)
- rack := NewRack("rack1")
- dc.LinkChildNode(rack)
- // Single data node with capacity for only 5 volumes
- dn := NewDataNode("server1")
- rack.LinkChildNode(dn)
- disk := NewDisk(types.HardDriveType.String())
- disk.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
- dn.LinkChildNode(disk)
- vg := NewDefaultVolumeGrowth()
- rp, _ := super_block.NewReplicaPlacementFromString("000") // Single copy (no replicas)
- option := &VolumeGrowOption{
- Collection: "test",
- ReplicaPlacement: rp,
- DiskType: types.HardDriveType,
- }
- // Simulate concurrent volume creation attempts
- const concurrentRequests = 10
- var wg sync.WaitGroup
- var successCount, failureCount atomic.Int32
- for i := 0; i < concurrentRequests; i++ {
- wg.Add(1)
- go func(requestId int) {
- defer wg.Done()
- _, reservation, err := vg.findEmptySlotsForOneVolume(topo, option, true)
- if err != nil {
- failureCount.Add(1)
- t.Logf("Request %d failed as expected: %v", requestId, err)
- } else {
- successCount.Add(1)
- t.Logf("Request %d succeeded, got reservation", requestId)
- // Release the reservation to simulate completion
- if reservation != nil {
- reservation.releaseAllReservations()
- // Simulate volume creation by incrementing count
- disk := dn.children[NodeId(types.HardDriveType.String())].(*Disk)
- deltaDiskUsage := &DiskUsageCounts{
- volumeCount: 1,
- }
- disk.UpAdjustDiskUsageDelta(types.HardDriveType, deltaDiskUsage)
- }
- }
- }(i)
- }
- wg.Wait()
- // With reservation system, only 5 requests should succeed (capacity limit)
- // The rest should fail due to insufficient capacity
- if successCount.Load() != 5 {
- t.Errorf("Expected exactly 5 successful reservations, got %d", successCount.Load())
- }
- if failureCount.Load() != 5 {
- t.Errorf("Expected exactly 5 failed reservations, got %d", failureCount.Load())
- }
- // Verify final state
- finalAvailable := dn.AvailableSpaceFor(option)
- if finalAvailable != 0 {
- t.Errorf("Expected 0 available space after all allocations, got %d", finalAvailable)
- }
- t.Logf("Concurrent test completed: %d successes, %d failures", successCount.Load(), failureCount.Load())
- }
- func TestVolumeGrowth_ReservationFailureRollback(t *testing.T) {
- // Create topology with multiple servers, but limited total capacity
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
- dc := NewDataCenter("dc1")
- topo.LinkChildNode(dc)
- rack := NewRack("rack1")
- dc.LinkChildNode(rack)
- // Create two servers with different available capacity
- dn1 := NewDataNode("server1")
- dn2 := NewDataNode("server2")
- rack.LinkChildNode(dn1)
- rack.LinkChildNode(dn2)
- // Server 1: 5 available slots
- disk1 := NewDisk(types.HardDriveType.String())
- disk1.diskUsages.getOrCreateDisk(types.HardDriveType).maxVolumeCount = 5
- dn1.LinkChildNode(disk1)
- // Server 2: 0 available slots (full)
- disk2 := NewDisk(types.HardDriveType.String())
- diskUsage2 := disk2.diskUsages.getOrCreateDisk(types.HardDriveType)
- diskUsage2.maxVolumeCount = 5
- diskUsage2.volumeCount = 5
- dn2.LinkChildNode(disk2)
- vg := NewDefaultVolumeGrowth()
- rp, _ := super_block.NewReplicaPlacementFromString("010") // requires 2 replicas
- option := &VolumeGrowOption{
- Collection: "test",
- ReplicaPlacement: rp,
- DiskType: types.HardDriveType,
- }
- // This should fail because we can't satisfy replica requirements
- // (need 2 servers but only 1 has space)
- _, _, err := vg.findEmptySlotsForOneVolume(topo, option, true)
- if err == nil {
- t.Error("Expected reservation to fail due to insufficient replica capacity")
- }
- // Verify no reservations are left hanging
- available1 := dn1.AvailableSpaceForReservation(option)
- if available1 != 5 {
- t.Errorf("Expected server1 to have all capacity available after failed reservation, got %d", available1)
- }
- available2 := dn2.AvailableSpaceForReservation(option)
- if available2 != 0 {
- t.Errorf("Expected server2 to have no capacity available, got %d", available2)
- }
- }
- func TestVolumeGrowth_ReservationTimeout(t *testing.T) {
- dn := NewDataNode("server1")
- diskType := types.HardDriveType
- // Set up capacity
- diskUsage := dn.diskUsages.getOrCreateDisk(diskType)
- diskUsage.maxVolumeCount = 5
- // Create a reservation
- reservationId, success := dn.TryReserveCapacity(diskType, 2)
- if !success {
- t.Fatal("Expected successful reservation")
- }
- // Manually set the reservation time to simulate old reservation
- dn.capacityReservations.Lock()
- if reservation, exists := dn.capacityReservations.reservations[reservationId]; exists {
- reservation.createdAt = time.Now().Add(-10 * time.Minute)
- }
- dn.capacityReservations.Unlock()
- // Try another reservation - this should trigger cleanup and succeed
- _, success = dn.TryReserveCapacity(diskType, 3)
- if !success {
- t.Error("Expected reservation to succeed after cleanup of expired reservation")
- }
- // Original reservation should be cleaned up
- option := &VolumeGrowOption{DiskType: diskType}
- available := dn.AvailableSpaceForReservation(option)
- if available != 2 { // 5 - 3 = 2
- t.Errorf("Expected 2 available slots after cleanup and new reservation, got %d", available)
- }
- }
|