storage_slot_test.go 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004
  1. package topology
  2. import (
  3. "fmt"
  4. "testing"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/stretchr/testify/assert"
  8. )
  9. // NOTE: These tests are designed to work with any value of erasure_coding.DataShardsCount.
  10. // This ensures compatibility with custom erasure coding configurations where DataShardsCount
  11. // might be changed from the default value of 10. All shard-to-volume conversion calculations
  12. // are done dynamically using the actual constant value.
  13. // testGetDiskStorageImpact is a test helper that provides the same interface as the removed
  14. // GetDiskStorageImpact method. For simplicity, it returns the total impact as "planned"
  15. // and zeros for "reserved" since the distinction is not critical for most test scenarios.
  16. func testGetDiskStorageImpact(at *ActiveTopology, nodeID string, diskID uint32) (plannedVolumeSlots, reservedVolumeSlots int64, plannedShardSlots, reservedShardSlots int32, estimatedSize int64) {
  17. impact := at.GetEffectiveCapacityImpact(nodeID, diskID)
  18. // Return total impact as "planned" for test compatibility
  19. return int64(impact.VolumeSlots), 0, impact.ShardSlots, 0, 0
  20. }
  21. // TestStorageSlotChangeArithmetic tests the arithmetic operations on StorageSlotChange
  22. func TestStorageSlotChangeArithmetic(t *testing.T) {
  23. // Test basic arithmetic operations
  24. a := StorageSlotChange{VolumeSlots: 5, ShardSlots: 10}
  25. b := StorageSlotChange{VolumeSlots: 3, ShardSlots: 8}
  26. // Test Add
  27. sum := a.Add(b)
  28. assert.Equal(t, StorageSlotChange{VolumeSlots: 8, ShardSlots: 18}, sum, "Add should work correctly")
  29. // Test Subtract
  30. diff := a.Subtract(b)
  31. assert.Equal(t, StorageSlotChange{VolumeSlots: 2, ShardSlots: 2}, diff, "Subtract should work correctly")
  32. // Test AddInPlace
  33. c := StorageSlotChange{VolumeSlots: 1, ShardSlots: 2}
  34. c.AddInPlace(b)
  35. assert.Equal(t, StorageSlotChange{VolumeSlots: 4, ShardSlots: 10}, c, "AddInPlace should modify in place")
  36. // Test SubtractInPlace
  37. d := StorageSlotChange{VolumeSlots: 10, ShardSlots: 20}
  38. d.SubtractInPlace(b)
  39. assert.Equal(t, StorageSlotChange{VolumeSlots: 7, ShardSlots: 12}, d, "SubtractInPlace should modify in place")
  40. // Test IsZero
  41. zero := StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}
  42. nonZero := StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
  43. assert.True(t, zero.IsZero(), "Zero struct should return true for IsZero")
  44. assert.False(t, nonZero.IsZero(), "Non-zero struct should return false for IsZero")
  45. // Test ToVolumeSlots conversion
  46. impact1 := StorageSlotChange{VolumeSlots: 5, ShardSlots: 10}
  47. assert.Equal(t, int64(6), impact1.ToVolumeSlots(), fmt.Sprintf("ToVolumeSlots should be 5 + 10/%d = 6", erasure_coding.DataShardsCount))
  48. impact2 := StorageSlotChange{VolumeSlots: -2, ShardSlots: 25}
  49. assert.Equal(t, int64(0), impact2.ToVolumeSlots(), fmt.Sprintf("ToVolumeSlots should be -2 + 25/%d = 0", erasure_coding.DataShardsCount))
  50. impact3 := StorageSlotChange{VolumeSlots: 3, ShardSlots: 7}
  51. assert.Equal(t, int64(3), impact3.ToVolumeSlots(), fmt.Sprintf("ToVolumeSlots should be 3 + 7/%d = 3 (integer division)", erasure_coding.DataShardsCount))
  52. }
  53. // TestStorageSlotChange tests the new dual-level storage slot tracking
  54. func TestStorageSlotChange(t *testing.T) {
  55. activeTopology := NewActiveTopology(10)
  56. // Create test topology
  57. topologyInfo := &master_pb.TopologyInfo{
  58. DataCenterInfos: []*master_pb.DataCenterInfo{
  59. {
  60. Id: "dc1",
  61. RackInfos: []*master_pb.RackInfo{
  62. {
  63. Id: "rack1",
  64. DataNodeInfos: []*master_pb.DataNodeInfo{
  65. {
  66. Id: "10.0.0.1:8080",
  67. DiskInfos: map[string]*master_pb.DiskInfo{
  68. "hdd": {
  69. DiskId: 0,
  70. Type: "hdd",
  71. VolumeCount: 5,
  72. MaxVolumeCount: 20,
  73. },
  74. },
  75. },
  76. {
  77. Id: "10.0.0.2:8080",
  78. DiskInfos: map[string]*master_pb.DiskInfo{
  79. "hdd": {
  80. DiskId: 0,
  81. Type: "hdd",
  82. VolumeCount: 8,
  83. MaxVolumeCount: 15,
  84. },
  85. },
  86. },
  87. },
  88. },
  89. },
  90. },
  91. },
  92. }
  93. activeTopology.UpdateTopology(topologyInfo)
  94. // Test 1: Basic storage slot calculation
  95. ecSourceChange, ecTargetChange := CalculateTaskStorageImpact(TaskTypeErasureCoding, 1024*1024*1024)
  96. assert.Equal(t, int32(0), ecSourceChange.VolumeSlots, "EC source reserves with zero StorageSlotChange impact")
  97. assert.Equal(t, int32(0), ecSourceChange.ShardSlots, "EC source should have zero shard impact")
  98. assert.Equal(t, int32(0), ecTargetChange.VolumeSlots, "EC should not directly impact target volume slots")
  99. assert.Equal(t, int32(0), ecTargetChange.ShardSlots, "EC target should have zero shard impact from this simplified function")
  100. balSourceChange, balTargetChange := CalculateTaskStorageImpact(TaskTypeBalance, 1024*1024*1024)
  101. assert.Equal(t, int32(-1), balSourceChange.VolumeSlots, "Balance should free 1 volume slot on source")
  102. assert.Equal(t, int32(1), balTargetChange.VolumeSlots, "Balance should consume 1 volume slot on target")
  103. // Test 2: EC shard impact calculation
  104. shardImpact := CalculateECShardStorageImpact(3, 100*1024*1024) // 3 shards, 100MB each
  105. assert.Equal(t, int32(0), shardImpact.VolumeSlots, "EC shards should not impact volume slots")
  106. assert.Equal(t, int32(3), shardImpact.ShardSlots, "EC should impact 3 shard slots")
  107. // Test 3: Add EC task with shard-level tracking
  108. sourceServer := "10.0.0.1:8080"
  109. sourceDisk := uint32(0)
  110. shardDestinations := []string{"10.0.0.2:8080", "10.0.0.2:8080"}
  111. shardDiskIDs := []uint32{0, 0}
  112. expectedShardSize := int64(50 * 1024 * 1024) // 50MB per shard
  113. originalVolumeSize := int64(1024 * 1024 * 1024) // 1GB original
  114. // Create source specs (single replica in this test)
  115. sources := []TaskSourceSpec{
  116. {ServerID: sourceServer, DiskID: sourceDisk, CleanupType: CleanupVolumeReplica},
  117. }
  118. // Create destination specs
  119. destinations := make([]TaskDestinationSpec, len(shardDestinations))
  120. shardImpact = CalculateECShardStorageImpact(1, expectedShardSize)
  121. for i, dest := range shardDestinations {
  122. destinations[i] = TaskDestinationSpec{
  123. ServerID: dest,
  124. DiskID: shardDiskIDs[i],
  125. StorageImpact: &shardImpact,
  126. EstimatedSize: &expectedShardSize,
  127. }
  128. }
  129. err := activeTopology.AddPendingTask(TaskSpec{
  130. TaskID: "ec_test",
  131. TaskType: TaskTypeErasureCoding,
  132. VolumeID: 100,
  133. VolumeSize: originalVolumeSize,
  134. Sources: sources,
  135. Destinations: destinations,
  136. })
  137. assert.NoError(t, err, "Should add EC shard task successfully")
  138. // Test 4: Check storage impact on source (EC reserves with zero impact)
  139. sourceImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.1:8080", 0)
  140. assert.Equal(t, int32(0), sourceImpact.VolumeSlots, "Source should show 0 volume slot impact (EC reserves with zero impact)")
  141. assert.Equal(t, int32(0), sourceImpact.ShardSlots, "Source should show 0 shard slot impact")
  142. // Test 5: Check storage impact on target (should gain shards)
  143. targetImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.2:8080", 0)
  144. assert.Equal(t, int32(0), targetImpact.VolumeSlots, "Target should show 0 volume slot impact (EC shards don't use volume slots)")
  145. assert.Equal(t, int32(2), targetImpact.ShardSlots, "Target should show 2 shard slot impact")
  146. // Test 6: Check effective capacity calculation (EC source reserves with zero StorageSlotChange)
  147. sourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  148. targetCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
  149. // Source: 15 original available (EC source reserves with zero StorageSlotChange impact)
  150. assert.Equal(t, int64(15), sourceCapacity, "Source should have 15 available slots (EC source has zero StorageSlotChange impact)")
  151. // Target: 7 original available - (2 shards / 10) = 7 (since 2/10 rounds down to 0)
  152. assert.Equal(t, int64(7), targetCapacity, "Target should have 7 available slots (minimal shard impact)")
  153. // Test 7: Add traditional balance task for comparison
  154. err = activeTopology.AddPendingTask(TaskSpec{
  155. TaskID: "balance_test",
  156. TaskType: TaskTypeBalance,
  157. VolumeID: 101,
  158. VolumeSize: 512 * 1024 * 1024,
  159. Sources: []TaskSourceSpec{
  160. {ServerID: "10.0.0.1:8080", DiskID: 0},
  161. },
  162. Destinations: []TaskDestinationSpec{
  163. {ServerID: "10.0.0.2:8080", DiskID: 0},
  164. },
  165. })
  166. assert.NoError(t, err, "Should add balance task successfully")
  167. // Check updated impacts after adding balance task
  168. finalSourceImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.1:8080", 0)
  169. finalTargetImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.2:8080", 0)
  170. assert.Equal(t, int32(-1), finalSourceImpact.VolumeSlots, "Source should show -1 volume slot impact (EC: 0, Balance: -1)")
  171. assert.Equal(t, int32(1), finalTargetImpact.VolumeSlots, "Target should show 1 volume slot impact (Balance: +1)")
  172. assert.Equal(t, int32(2), finalTargetImpact.ShardSlots, "Target should still show 2 shard slot impact (EC shards)")
  173. }
  174. // TestStorageSlotChangeCapacityCalculation tests the capacity calculation with mixed slot types
  175. func TestStorageSlotChangeCapacityCalculation(t *testing.T) {
  176. activeTopology := NewActiveTopology(10)
  177. // Create simple topology
  178. topologyInfo := &master_pb.TopologyInfo{
  179. DataCenterInfos: []*master_pb.DataCenterInfo{
  180. {
  181. Id: "dc1",
  182. RackInfos: []*master_pb.RackInfo{
  183. {
  184. Id: "rack1",
  185. DataNodeInfos: []*master_pb.DataNodeInfo{
  186. {
  187. Id: "10.0.0.1:8080",
  188. DiskInfos: map[string]*master_pb.DiskInfo{
  189. "hdd": {
  190. DiskId: 0,
  191. Type: "hdd",
  192. VolumeCount: 10,
  193. MaxVolumeCount: 100, // Large capacity for testing
  194. },
  195. },
  196. },
  197. },
  198. },
  199. },
  200. },
  201. },
  202. }
  203. activeTopology.UpdateTopology(topologyInfo)
  204. // Initial capacity
  205. initialCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  206. assert.Equal(t, int64(90), initialCapacity, "Should start with 90 available slots")
  207. // Add tasks with different shard slot impacts
  208. targetImpact1 := StorageSlotChange{VolumeSlots: 0, ShardSlots: 5} // Target gains 5 shards
  209. estimatedSize1 := int64(100 * 1024 * 1024)
  210. err := activeTopology.AddPendingTask(TaskSpec{
  211. TaskID: "shard_test_1",
  212. TaskType: TaskTypeErasureCoding,
  213. VolumeID: 100,
  214. VolumeSize: estimatedSize1,
  215. Sources: []TaskSourceSpec{
  216. {ServerID: "", DiskID: 0}, // Source not applicable here
  217. },
  218. Destinations: []TaskDestinationSpec{
  219. {ServerID: "10.0.0.1:8080", DiskID: 0, StorageImpact: &targetImpact1, EstimatedSize: &estimatedSize1},
  220. },
  221. })
  222. assert.NoError(t, err, "Should add shard test 1 successfully")
  223. // Capacity should be reduced by pending tasks via StorageSlotChange
  224. capacityAfterShards := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  225. // Dynamic calculation: 5 shards < DataShardsCount, so no volume impact
  226. expectedImpact5 := int64(5 / erasure_coding.DataShardsCount) // Should be 0 for any reasonable DataShardsCount
  227. assert.Equal(t, int64(90-expectedImpact5), capacityAfterShards, fmt.Sprintf("5 shard slots should consume %d volume slot equivalent (5/%d = %d)", expectedImpact5, erasure_coding.DataShardsCount, expectedImpact5))
  228. // Add more shards to reach threshold
  229. additionalShards := int32(erasure_coding.DataShardsCount) // Add exactly one volume worth of shards
  230. targetImpact2 := StorageSlotChange{VolumeSlots: 0, ShardSlots: additionalShards} // Target gains additional shards
  231. estimatedSize2 := int64(100 * 1024 * 1024)
  232. err = activeTopology.AddPendingTask(TaskSpec{
  233. TaskID: "shard_test_2",
  234. TaskType: TaskTypeErasureCoding,
  235. VolumeID: 101,
  236. VolumeSize: estimatedSize2,
  237. Sources: []TaskSourceSpec{
  238. {ServerID: "", DiskID: 0}, // Source not applicable here
  239. },
  240. Destinations: []TaskDestinationSpec{
  241. {ServerID: "10.0.0.1:8080", DiskID: 0, StorageImpact: &targetImpact2, EstimatedSize: &estimatedSize2},
  242. },
  243. })
  244. assert.NoError(t, err, "Should add shard test 2 successfully")
  245. // Dynamic calculation: (5 + DataShardsCount) shards should consume 1 volume slot
  246. totalShards := 5 + erasure_coding.DataShardsCount
  247. expectedImpact15 := int64(totalShards / erasure_coding.DataShardsCount) // Should be 1
  248. capacityAfterMoreShards := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  249. assert.Equal(t, int64(90-expectedImpact15), capacityAfterMoreShards, fmt.Sprintf("%d shard slots should consume %d volume slot equivalent (%d/%d = %d)", totalShards, expectedImpact15, totalShards, erasure_coding.DataShardsCount, expectedImpact15))
  250. // Add a full volume task
  251. targetImpact3 := StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} // Target gains 1 volume
  252. estimatedSize3 := int64(1024 * 1024 * 1024)
  253. err = activeTopology.AddPendingTask(TaskSpec{
  254. TaskID: "volume_test",
  255. TaskType: TaskTypeBalance,
  256. VolumeID: 102,
  257. VolumeSize: estimatedSize3,
  258. Sources: []TaskSourceSpec{
  259. {ServerID: "", DiskID: 0}, // Source not applicable here
  260. },
  261. Destinations: []TaskDestinationSpec{
  262. {ServerID: "10.0.0.1:8080", DiskID: 0, StorageImpact: &targetImpact3, EstimatedSize: &estimatedSize3},
  263. },
  264. })
  265. assert.NoError(t, err, "Should add volume test successfully")
  266. // Capacity should be reduced by 1 more volume slot
  267. finalCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  268. assert.Equal(t, int64(88), finalCapacity, "1 volume + 15 shard slots should consume 2 volume slots total")
  269. // Verify the detailed storage impact
  270. plannedVol, reservedVol, plannedShard, reservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.1:8080", 0)
  271. assert.Equal(t, int64(1), plannedVol, "Should show 1 planned volume slot")
  272. assert.Equal(t, int64(0), reservedVol, "Should show 0 reserved volume slots")
  273. assert.Equal(t, int32(15), plannedShard, "Should show 15 planned shard slots")
  274. assert.Equal(t, int32(0), reservedShard, "Should show 0 reserved shard slots")
  275. }
  276. // TestECMultipleTargets demonstrates proper handling of EC operations with multiple targets
  277. func TestECMultipleTargets(t *testing.T) {
  278. activeTopology := NewActiveTopology(10)
  279. // Create test topology with multiple target nodes
  280. topologyInfo := &master_pb.TopologyInfo{
  281. DataCenterInfos: []*master_pb.DataCenterInfo{
  282. {
  283. Id: "dc1",
  284. RackInfos: []*master_pb.RackInfo{
  285. {
  286. Id: "rack1",
  287. DataNodeInfos: []*master_pb.DataNodeInfo{
  288. {
  289. Id: "10.0.0.1:8080", // Source
  290. DiskInfos: map[string]*master_pb.DiskInfo{
  291. "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 10, MaxVolumeCount: 50},
  292. },
  293. },
  294. {
  295. Id: "10.0.0.2:8080", // Target 1
  296. DiskInfos: map[string]*master_pb.DiskInfo{
  297. "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 5, MaxVolumeCount: 30},
  298. },
  299. },
  300. {
  301. Id: "10.0.0.3:8080", // Target 2
  302. DiskInfos: map[string]*master_pb.DiskInfo{
  303. "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 8, MaxVolumeCount: 40},
  304. },
  305. },
  306. {
  307. Id: "10.0.0.4:8080", // Target 3
  308. DiskInfos: map[string]*master_pb.DiskInfo{
  309. "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 12, MaxVolumeCount: 35},
  310. },
  311. },
  312. },
  313. },
  314. },
  315. },
  316. },
  317. }
  318. activeTopology.UpdateTopology(topologyInfo)
  319. // Demonstrate why CalculateTaskStorageImpact is insufficient for EC
  320. sourceChange, targetChange := CalculateTaskStorageImpact(TaskTypeErasureCoding, 1*1024*1024*1024)
  321. assert.Equal(t, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, sourceChange, "Source reserves with zero StorageSlotChange")
  322. assert.Equal(t, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, targetChange, "Target has zero impact from simplified function - insufficient for multi-target EC")
  323. // Proper way: Use AddPendingTask for multiple targets
  324. sourceServer := "10.0.0.1:8080"
  325. sourceDisk := uint32(0)
  326. // EC typically distributes shards across multiple targets
  327. shardDestinations := []string{
  328. "10.0.0.2:8080", "10.0.0.2:8080", "10.0.0.2:8080", "10.0.0.2:8080", "10.0.0.2:8080", // 5 shards to target 1
  329. "10.0.0.3:8080", "10.0.0.3:8080", "10.0.0.3:8080", "10.0.0.3:8080", "10.0.0.3:8080", // 5 shards to target 2
  330. "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", // 4 shards to target 3
  331. }
  332. shardDiskIDs := make([]uint32, len(shardDestinations))
  333. for i := range shardDiskIDs {
  334. shardDiskIDs[i] = 0
  335. }
  336. // Create source specs (single replica in this test)
  337. sources := []TaskSourceSpec{
  338. {ServerID: sourceServer, DiskID: sourceDisk, CleanupType: CleanupVolumeReplica},
  339. }
  340. // Create destination specs
  341. destinations := make([]TaskDestinationSpec, len(shardDestinations))
  342. expectedShardSize := int64(50 * 1024 * 1024)
  343. shardImpact := CalculateECShardStorageImpact(1, expectedShardSize)
  344. for i, dest := range shardDestinations {
  345. destinations[i] = TaskDestinationSpec{
  346. ServerID: dest,
  347. DiskID: shardDiskIDs[i],
  348. StorageImpact: &shardImpact,
  349. EstimatedSize: &expectedShardSize,
  350. }
  351. }
  352. err := activeTopology.AddPendingTask(TaskSpec{
  353. TaskID: "ec_multi_target",
  354. TaskType: TaskTypeErasureCoding,
  355. VolumeID: 200,
  356. VolumeSize: 1 * 1024 * 1024 * 1024,
  357. Sources: sources,
  358. Destinations: destinations,
  359. })
  360. assert.NoError(t, err, "Should add multi-target EC task successfully")
  361. // Verify source impact (EC reserves with zero StorageSlotChange)
  362. sourcePlannedVol, sourceReservedVol, sourcePlannedShard, sourceReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.1:8080", 0)
  363. assert.Equal(t, int64(0), sourcePlannedVol, "Source should reserve with zero volume slot impact")
  364. assert.Equal(t, int64(0), sourceReservedVol, "Source should not have reserved capacity yet")
  365. assert.Equal(t, int32(0), sourcePlannedShard, "Source should not have planned shard impact")
  366. assert.Equal(t, int32(0), sourceReservedShard, "Source should not have reserved shard impact")
  367. // Note: EstimatedSize tracking is no longer exposed via public API
  368. // Verify target impacts (planned, not yet reserved)
  369. target1PlannedVol, target1ReservedVol, target1PlannedShard, target1ReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.2:8080", 0)
  370. target2PlannedVol, target2ReservedVol, target2PlannedShard, target2ReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.3:8080", 0)
  371. target3PlannedVol, target3ReservedVol, target3PlannedShard, target3ReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.4:8080", 0)
  372. assert.Equal(t, int64(0), target1PlannedVol, "Target 1 should not have planned volume impact")
  373. assert.Equal(t, int32(5), target1PlannedShard, "Target 1 should plan to receive 5 shards")
  374. assert.Equal(t, int64(0), target1ReservedVol, "Target 1 should not have reserved capacity yet")
  375. assert.Equal(t, int32(0), target1ReservedShard, "Target 1 should not have reserved shards yet")
  376. assert.Equal(t, int64(0), target2PlannedVol, "Target 2 should not have planned volume impact")
  377. assert.Equal(t, int32(5), target2PlannedShard, "Target 2 should plan to receive 5 shards")
  378. assert.Equal(t, int64(0), target2ReservedVol, "Target 2 should not have reserved capacity yet")
  379. assert.Equal(t, int32(0), target2ReservedShard, "Target 2 should not have reserved shards yet")
  380. assert.Equal(t, int64(0), target3PlannedVol, "Target 3 should not have planned volume impact")
  381. assert.Equal(t, int32(4), target3PlannedShard, "Target 3 should plan to receive 4 shards")
  382. assert.Equal(t, int64(0), target3ReservedVol, "Target 3 should not have reserved capacity yet")
  383. assert.Equal(t, int32(0), target3ReservedShard, "Target 3 should not have reserved shards yet")
  384. // Verify effective capacity (considers both pending and active tasks via StorageSlotChange)
  385. sourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  386. target1Capacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
  387. target2Capacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.3:8080", 0)
  388. target3Capacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.4:8080", 0)
  389. // Dynamic capacity calculations based on actual DataShardsCount
  390. expectedTarget1Impact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
  391. expectedTarget2Impact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
  392. expectedTarget3Impact := int64(4 / erasure_coding.DataShardsCount) // 4 shards impact
  393. assert.Equal(t, int64(40), sourceCapacity, "Source: 40 (EC source reserves with zero StorageSlotChange impact)")
  394. assert.Equal(t, int64(25-expectedTarget1Impact), target1Capacity, fmt.Sprintf("Target 1: 25 - %d (5 shards/%d = %d impact) = %d", expectedTarget1Impact, erasure_coding.DataShardsCount, expectedTarget1Impact, 25-expectedTarget1Impact))
  395. assert.Equal(t, int64(32-expectedTarget2Impact), target2Capacity, fmt.Sprintf("Target 2: 32 - %d (5 shards/%d = %d impact) = %d", expectedTarget2Impact, erasure_coding.DataShardsCount, expectedTarget2Impact, 32-expectedTarget2Impact))
  396. assert.Equal(t, int64(23-expectedTarget3Impact), target3Capacity, fmt.Sprintf("Target 3: 23 - %d (4 shards/%d = %d impact) = %d", expectedTarget3Impact, erasure_coding.DataShardsCount, expectedTarget3Impact, 23-expectedTarget3Impact))
  397. t.Logf("EC operation distributed %d shards across %d targets", len(shardDestinations), 3)
  398. t.Logf("Capacity impacts: EC source reserves with zero impact, Targets minimal (shards < %d)", erasure_coding.DataShardsCount)
  399. }
  400. // TestCapacityReservationCycle demonstrates the complete task lifecycle and capacity management
  401. func TestCapacityReservationCycle(t *testing.T) {
  402. activeTopology := NewActiveTopology(10)
  403. // Create test topology
  404. topologyInfo := &master_pb.TopologyInfo{
  405. DataCenterInfos: []*master_pb.DataCenterInfo{
  406. {
  407. Id: "dc1",
  408. RackInfos: []*master_pb.RackInfo{
  409. {
  410. Id: "rack1",
  411. DataNodeInfos: []*master_pb.DataNodeInfo{
  412. {
  413. Id: "10.0.0.1:8080",
  414. DiskInfos: map[string]*master_pb.DiskInfo{
  415. "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 10, MaxVolumeCount: 20},
  416. },
  417. },
  418. {
  419. Id: "10.0.0.2:8080",
  420. DiskInfos: map[string]*master_pb.DiskInfo{
  421. "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 5, MaxVolumeCount: 15},
  422. },
  423. },
  424. },
  425. },
  426. },
  427. },
  428. },
  429. }
  430. activeTopology.UpdateTopology(topologyInfo)
  431. // Initial capacity
  432. sourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  433. targetCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
  434. assert.Equal(t, int64(10), sourceCapacity, "Source initial capacity")
  435. assert.Equal(t, int64(10), targetCapacity, "Target initial capacity")
  436. // Step 1: Add pending task (should reserve capacity via StorageSlotChange)
  437. err := activeTopology.AddPendingTask(TaskSpec{
  438. TaskID: "balance_test",
  439. TaskType: TaskTypeBalance,
  440. VolumeID: 123,
  441. VolumeSize: 1 * 1024 * 1024 * 1024,
  442. Sources: []TaskSourceSpec{
  443. {ServerID: "10.0.0.1:8080", DiskID: 0},
  444. },
  445. Destinations: []TaskDestinationSpec{
  446. {ServerID: "10.0.0.2:8080", DiskID: 0},
  447. },
  448. })
  449. assert.NoError(t, err, "Should add balance test successfully")
  450. sourceCapacityAfterPending := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  451. targetCapacityAfterPending := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
  452. assert.Equal(t, int64(11), sourceCapacityAfterPending, "Source should gain capacity from pending balance task (balance source frees 1 slot)")
  453. assert.Equal(t, int64(9), targetCapacityAfterPending, "Target should consume capacity from pending task (balance reserves 1 slot)")
  454. // Verify planning capacity considers the same pending tasks
  455. planningDisks := activeTopology.GetDisksForPlanning(TaskTypeBalance, "", 1)
  456. assert.Len(t, planningDisks, 2, "Both disks should be available for planning")
  457. // Step 2: Assign task (capacity already reserved by pending task)
  458. err = activeTopology.AssignTask("balance_test")
  459. assert.NoError(t, err, "Should assign task successfully")
  460. sourceCapacityAfterAssign := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  461. targetCapacityAfterAssign := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
  462. assert.Equal(t, int64(11), sourceCapacityAfterAssign, "Source capacity should remain same (already accounted by pending)")
  463. assert.Equal(t, int64(9), targetCapacityAfterAssign, "Target capacity should remain same (already accounted by pending)")
  464. // Note: Detailed task state tracking (planned vs reserved) is no longer exposed via public API
  465. // The important functionality is that capacity calculations remain consistent
  466. // Step 3: Complete task (should release reserved capacity)
  467. err = activeTopology.CompleteTask("balance_test")
  468. assert.NoError(t, err, "Should complete task successfully")
  469. sourceCapacityAfterComplete := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  470. targetCapacityAfterComplete := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
  471. assert.Equal(t, int64(10), sourceCapacityAfterComplete, "Source should return to original capacity")
  472. assert.Equal(t, int64(10), targetCapacityAfterComplete, "Target should return to original capacity")
  473. // Step 4: Apply actual storage change (simulates master topology update)
  474. activeTopology.ApplyActualStorageChange("10.0.0.1:8080", 0, -1) // Source loses 1 volume
  475. activeTopology.ApplyActualStorageChange("10.0.0.2:8080", 0, 1) // Target gains 1 volume
  476. // Final capacity should reflect actual topology changes
  477. finalSourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  478. finalTargetCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
  479. assert.Equal(t, int64(11), finalSourceCapacity, "Source: (20-9) = 11 after losing 1 volume")
  480. assert.Equal(t, int64(9), finalTargetCapacity, "Target: (15-6) = 9 after gaining 1 volume")
  481. t.Logf("Capacity lifecycle with StorageSlotChange: Pending -> Assigned -> Released -> Applied")
  482. t.Logf("Source: 10 -> 11 -> 11 -> 10 -> 11 (freed by pending balance, then applied)")
  483. t.Logf("Target: 10 -> 9 -> 9 -> 10 -> 9 (reserved by pending, then applied)")
  484. }
  485. // TestReplicatedVolumeECOperations tests EC operations on replicated volumes
  486. func TestReplicatedVolumeECOperations(t *testing.T) {
  487. activeTopology := NewActiveTopology(10)
  488. // Setup cluster with multiple servers for replicated volumes
  489. activeTopology.UpdateTopology(&master_pb.TopologyInfo{
  490. DataCenterInfos: []*master_pb.DataCenterInfo{
  491. {
  492. Id: "dc1",
  493. RackInfos: []*master_pb.RackInfo{
  494. {
  495. Id: "rack1",
  496. DataNodeInfos: []*master_pb.DataNodeInfo{
  497. {
  498. Id: "10.0.0.1:8080",
  499. DiskInfos: map[string]*master_pb.DiskInfo{
  500. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 10},
  501. },
  502. },
  503. {
  504. Id: "10.0.0.2:8080",
  505. DiskInfos: map[string]*master_pb.DiskInfo{
  506. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 5},
  507. },
  508. },
  509. {
  510. Id: "10.0.0.3:8080",
  511. DiskInfos: map[string]*master_pb.DiskInfo{
  512. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 3},
  513. },
  514. },
  515. {
  516. Id: "10.0.0.4:8080",
  517. DiskInfos: map[string]*master_pb.DiskInfo{
  518. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 15},
  519. },
  520. },
  521. {
  522. Id: "10.0.0.5:8080",
  523. DiskInfos: map[string]*master_pb.DiskInfo{
  524. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 20},
  525. },
  526. },
  527. {
  528. Id: "10.0.0.6:8080",
  529. DiskInfos: map[string]*master_pb.DiskInfo{
  530. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 25},
  531. },
  532. },
  533. },
  534. },
  535. },
  536. },
  537. },
  538. })
  539. // Test: EC operation on replicated volume (3 replicas)
  540. volumeID := uint32(300)
  541. originalVolumeSize := int64(1024 * 1024 * 1024) // 1GB
  542. // Create source specs for replicated volume (3 replicas)
  543. sources := []TaskSourceSpec{
  544. {ServerID: "10.0.0.1:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Replica 1
  545. {ServerID: "10.0.0.2:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Replica 2
  546. {ServerID: "10.0.0.3:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Replica 3
  547. }
  548. // EC destinations (shards distributed across different servers than sources)
  549. shardDestinations := []string{
  550. "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", // 5 shards
  551. "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", // 5 shards
  552. "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", // 4 shards
  553. }
  554. shardDiskIDs := make([]uint32, len(shardDestinations))
  555. for i := range shardDiskIDs {
  556. shardDiskIDs[i] = 0
  557. }
  558. expectedShardSize := int64(50 * 1024 * 1024) // 50MB per shard
  559. // Create destination specs
  560. destinations := make([]TaskDestinationSpec, len(shardDestinations))
  561. shardImpact := CalculateECShardStorageImpact(1, expectedShardSize)
  562. for i, dest := range shardDestinations {
  563. destinations[i] = TaskDestinationSpec{
  564. ServerID: dest,
  565. DiskID: shardDiskIDs[i],
  566. StorageImpact: &shardImpact,
  567. EstimatedSize: &expectedShardSize,
  568. }
  569. }
  570. // Create EC task for replicated volume
  571. err := activeTopology.AddPendingTask(TaskSpec{
  572. TaskID: "ec_replicated",
  573. TaskType: TaskTypeErasureCoding,
  574. VolumeID: volumeID,
  575. VolumeSize: originalVolumeSize,
  576. Sources: sources,
  577. Destinations: destinations,
  578. })
  579. assert.NoError(t, err, "Should successfully create EC task for replicated volume")
  580. // Verify capacity impact on all source replicas (each should reserve with zero impact)
  581. for i, source := range sources {
  582. plannedVol, reservedVol, plannedShard, reservedShard, _ := testGetDiskStorageImpact(activeTopology, source.ServerID, source.DiskID)
  583. assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("Source replica %d should reserve with zero volume slot impact", i+1))
  584. assert.Equal(t, int64(0), reservedVol, fmt.Sprintf("Source replica %d should have no active volume slots", i+1))
  585. assert.Equal(t, int32(0), plannedShard, fmt.Sprintf("Source replica %d should have no planned shard slots", i+1))
  586. assert.Equal(t, int32(0), reservedShard, fmt.Sprintf("Source replica %d should have no active shard slots", i+1))
  587. // Note: EstimatedSize tracking is no longer exposed via public API
  588. }
  589. // Verify capacity impact on EC destinations
  590. destinationCounts := make(map[string]int)
  591. for _, dest := range shardDestinations {
  592. destinationCounts[dest]++
  593. }
  594. for serverID, expectedShards := range destinationCounts {
  595. plannedVol, _, plannedShard, _, _ := testGetDiskStorageImpact(activeTopology, serverID, 0)
  596. assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("Destination %s should have no planned volume slots", serverID))
  597. assert.Equal(t, int32(expectedShards), plannedShard, fmt.Sprintf("Destination %s should plan to receive %d shards", serverID, expectedShards))
  598. }
  599. // Verify effective capacity calculation for sources (should have zero EC impact)
  600. sourceCapacity1 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  601. sourceCapacity2 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
  602. sourceCapacity3 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.3:8080", 0)
  603. // All sources should have same capacity as baseline (EC source reserves with zero impact)
  604. assert.Equal(t, int64(90), sourceCapacity1, "Source 1: 100 - 10 (current) - 0 (EC source impact) = 90")
  605. assert.Equal(t, int64(95), sourceCapacity2, "Source 2: 100 - 5 (current) - 0 (EC source impact) = 95")
  606. assert.Equal(t, int64(97), sourceCapacity3, "Source 3: 100 - 3 (current) - 0 (EC source impact) = 97")
  607. // Verify effective capacity calculation for destinations (should be reduced by shard slots)
  608. destCapacity4 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.4:8080", 0)
  609. destCapacity5 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.5:8080", 0)
  610. destCapacity6 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.6:8080", 0)
  611. // Dynamic shard impact calculations
  612. dest4ShardImpact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
  613. dest5ShardImpact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
  614. dest6ShardImpact := int64(4 / erasure_coding.DataShardsCount) // 4 shards impact
  615. // Destinations should be reduced by shard impact
  616. assert.Equal(t, int64(85-dest4ShardImpact), destCapacity4, fmt.Sprintf("Dest 4: 100 - 15 (current) - %d (5 shards/%d = %d impact) = %d", dest4ShardImpact, erasure_coding.DataShardsCount, dest4ShardImpact, 85-dest4ShardImpact))
  617. assert.Equal(t, int64(80-dest5ShardImpact), destCapacity5, fmt.Sprintf("Dest 5: 100 - 20 (current) - %d (5 shards/%d = %d impact) = %d", dest5ShardImpact, erasure_coding.DataShardsCount, dest5ShardImpact, 80-dest5ShardImpact))
  618. assert.Equal(t, int64(75-dest6ShardImpact), destCapacity6, fmt.Sprintf("Dest 6: 100 - 25 (current) - %d (4 shards/%d = %d impact) = %d", dest6ShardImpact, erasure_coding.DataShardsCount, dest6ShardImpact, 75-dest6ShardImpact))
  619. t.Logf("Replicated volume EC operation: %d source replicas, %d EC shards distributed across %d destinations",
  620. len(sources), len(shardDestinations), len(destinationCounts))
  621. t.Logf("Each source replica reserves with zero capacity impact, destinations receive EC shards")
  622. }
  623. // TestECWithOldShardCleanup tests EC operations that need to clean up old shards from previous failed attempts
  624. func TestECWithOldShardCleanup(t *testing.T) {
  625. activeTopology := NewActiveTopology(10)
  626. // Setup cluster with servers
  627. activeTopology.UpdateTopology(&master_pb.TopologyInfo{
  628. DataCenterInfos: []*master_pb.DataCenterInfo{
  629. {
  630. Id: "dc1",
  631. RackInfos: []*master_pb.RackInfo{
  632. {
  633. Id: "rack1",
  634. DataNodeInfos: []*master_pb.DataNodeInfo{
  635. {
  636. Id: "10.0.0.1:8080",
  637. DiskInfos: map[string]*master_pb.DiskInfo{
  638. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 10},
  639. },
  640. },
  641. {
  642. Id: "10.0.0.2:8080",
  643. DiskInfos: map[string]*master_pb.DiskInfo{
  644. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 5},
  645. },
  646. },
  647. {
  648. Id: "10.0.0.3:8080", // Had old EC shards from previous failed attempt
  649. DiskInfos: map[string]*master_pb.DiskInfo{
  650. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 3},
  651. },
  652. },
  653. {
  654. Id: "10.0.0.4:8080", // Had old EC shards from previous failed attempt
  655. DiskInfos: map[string]*master_pb.DiskInfo{
  656. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 7},
  657. },
  658. },
  659. {
  660. Id: "10.0.0.5:8080", // New EC destination
  661. DiskInfos: map[string]*master_pb.DiskInfo{
  662. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 20},
  663. },
  664. },
  665. {
  666. Id: "10.0.0.6:8080", // New EC destination
  667. DiskInfos: map[string]*master_pb.DiskInfo{
  668. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 25},
  669. },
  670. },
  671. },
  672. },
  673. },
  674. },
  675. },
  676. })
  677. // Test: EC operation that needs to clean up both volume replicas AND old EC shards
  678. volumeID := uint32(400)
  679. originalVolumeSize := int64(1024 * 1024 * 1024) // 1GB
  680. // Create source specs: volume replicas + old EC shard locations
  681. sources := []TaskSourceSpec{
  682. {ServerID: "10.0.0.1:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Volume replica 1
  683. {ServerID: "10.0.0.2:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Volume replica 2
  684. {ServerID: "10.0.0.3:8080", DiskID: 0, CleanupType: CleanupECShards}, // Old EC shards from failed attempt
  685. {ServerID: "10.0.0.4:8080", DiskID: 0, CleanupType: CleanupECShards}, // Old EC shards from failed attempt
  686. }
  687. // EC destinations (new complete set of shards)
  688. shardDestinations := []string{
  689. "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", // 5 shards
  690. "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", // 4 more shards (9 total)
  691. "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", // 5 shards
  692. }
  693. shardDiskIDs := make([]uint32, len(shardDestinations))
  694. for i := range shardDiskIDs {
  695. shardDiskIDs[i] = 0
  696. }
  697. expectedShardSize := int64(50 * 1024 * 1024) // 50MB per shard
  698. // Create destination specs
  699. destinations := make([]TaskDestinationSpec, len(shardDestinations))
  700. shardImpact := CalculateECShardStorageImpact(1, expectedShardSize)
  701. for i, dest := range shardDestinations {
  702. destinations[i] = TaskDestinationSpec{
  703. ServerID: dest,
  704. DiskID: shardDiskIDs[i],
  705. StorageImpact: &shardImpact,
  706. EstimatedSize: &expectedShardSize,
  707. }
  708. }
  709. // Create EC task that cleans up both volume replicas and old EC shards
  710. err := activeTopology.AddPendingTask(TaskSpec{
  711. TaskID: "ec_cleanup",
  712. TaskType: TaskTypeErasureCoding,
  713. VolumeID: volumeID,
  714. VolumeSize: originalVolumeSize,
  715. Sources: sources,
  716. Destinations: destinations,
  717. })
  718. assert.NoError(t, err, "Should successfully create EC task with mixed cleanup types")
  719. // Verify capacity impact on volume replica sources (zero impact for EC)
  720. for i := 0; i < 2; i++ {
  721. source := sources[i]
  722. plannedVol, _, plannedShard, _, _ := testGetDiskStorageImpact(activeTopology, source.ServerID, source.DiskID)
  723. assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("Volume replica source %d should have zero volume slot impact", i+1))
  724. assert.Equal(t, int32(0), plannedShard, fmt.Sprintf("Volume replica source %d should have zero shard slot impact", i+1))
  725. // Note: EstimatedSize tracking is no longer exposed via public API
  726. }
  727. // Verify capacity impact on old EC shard sources (should free shard slots)
  728. for i := 2; i < 4; i++ {
  729. source := sources[i]
  730. plannedVol, _, plannedShard, _, _ := testGetDiskStorageImpact(activeTopology, source.ServerID, source.DiskID)
  731. assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("EC shard source %d should have zero volume slot impact", i+1))
  732. assert.Equal(t, int32(-erasure_coding.TotalShardsCount), plannedShard, fmt.Sprintf("EC shard source %d should free %d shard slots", i+1, erasure_coding.TotalShardsCount))
  733. // Note: EstimatedSize tracking is no longer exposed via public API
  734. }
  735. // Verify capacity impact on new EC destinations
  736. destPlan5, _, destShard5, _, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.5:8080", 0)
  737. destPlan6, _, destShard6, _, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.6:8080", 0)
  738. assert.Equal(t, int64(0), destPlan5, "New EC destination 5 should have no planned volume slots")
  739. assert.Equal(t, int32(9), destShard5, "New EC destination 5 should plan to receive 9 shards")
  740. assert.Equal(t, int64(0), destPlan6, "New EC destination 6 should have no planned volume slots")
  741. assert.Equal(t, int32(5), destShard6, "New EC destination 6 should plan to receive 5 shards")
  742. // Verify effective capacity calculation shows proper impact
  743. capacity3 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.3:8080", 0) // Freeing old EC shards
  744. capacity4 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.4:8080", 0) // Freeing old EC shards
  745. capacity5 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.5:8080", 0) // Receiving new EC shards
  746. capacity6 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.6:8080", 0) // Receiving new EC shards
  747. // Servers freeing old EC shards should have INCREASED capacity (freed shard slots provide capacity)
  748. assert.Equal(t, int64(98), capacity3, fmt.Sprintf("Server 3: 100 - 3 (current) + 1 (freeing %d shards) = 98", erasure_coding.TotalShardsCount))
  749. assert.Equal(t, int64(94), capacity4, fmt.Sprintf("Server 4: 100 - 7 (current) + 1 (freeing %d shards) = 94", erasure_coding.TotalShardsCount))
  750. // Servers receiving new EC shards should have slightly reduced capacity
  751. server5ShardImpact := int64(9 / erasure_coding.DataShardsCount) // 9 shards impact
  752. server6ShardImpact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
  753. assert.Equal(t, int64(80-server5ShardImpact), capacity5, fmt.Sprintf("Server 5: 100 - 20 (current) - %d (9 shards/%d = %d impact) = %d", server5ShardImpact, erasure_coding.DataShardsCount, server5ShardImpact, 80-server5ShardImpact))
  754. assert.Equal(t, int64(75-server6ShardImpact), capacity6, fmt.Sprintf("Server 6: 100 - 25 (current) - %d (5 shards/%d = %d impact) = %d", server6ShardImpact, erasure_coding.DataShardsCount, server6ShardImpact, 75-server6ShardImpact))
  755. t.Logf("EC operation with cleanup: %d volume replicas + %d old EC shard locations → %d new EC shards",
  756. 2, 2, len(shardDestinations))
  757. t.Logf("Volume sources have zero impact, old EC shard sources free capacity, new destinations consume shard slots")
  758. }
  759. // TestDetailedCapacityCalculations tests the new StorageSlotChange-based capacity calculation functions
  760. func TestDetailedCapacityCalculations(t *testing.T) {
  761. activeTopology := NewActiveTopology(10)
  762. // Setup cluster
  763. activeTopology.UpdateTopology(&master_pb.TopologyInfo{
  764. DataCenterInfos: []*master_pb.DataCenterInfo{
  765. {
  766. Id: "dc1",
  767. RackInfos: []*master_pb.RackInfo{
  768. {
  769. Id: "rack1",
  770. DataNodeInfos: []*master_pb.DataNodeInfo{
  771. {
  772. Id: "10.0.0.1:8080",
  773. DiskInfos: map[string]*master_pb.DiskInfo{
  774. "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 20},
  775. },
  776. },
  777. },
  778. },
  779. },
  780. },
  781. },
  782. })
  783. // Test: Add an EC task and check detailed capacity
  784. sources := []TaskSourceSpec{
  785. {ServerID: "10.0.0.1:8080", DiskID: 0, CleanupType: CleanupVolumeReplica},
  786. }
  787. shardDestinations := []string{"10.0.0.1:8080", "10.0.0.1:8080", "10.0.0.1:8080", "10.0.0.1:8080", "10.0.0.1:8080"}
  788. shardDiskIDs := []uint32{0, 0, 0, 0, 0}
  789. // Create destination specs
  790. destinations := make([]TaskDestinationSpec, len(shardDestinations))
  791. expectedShardSize := int64(50 * 1024 * 1024)
  792. shardImpact := CalculateECShardStorageImpact(1, expectedShardSize)
  793. for i, dest := range shardDestinations {
  794. destinations[i] = TaskDestinationSpec{
  795. ServerID: dest,
  796. DiskID: shardDiskIDs[i],
  797. StorageImpact: &shardImpact,
  798. EstimatedSize: &expectedShardSize,
  799. }
  800. }
  801. err := activeTopology.AddPendingTask(TaskSpec{
  802. TaskID: "detailed_test",
  803. TaskType: TaskTypeErasureCoding,
  804. VolumeID: 500,
  805. VolumeSize: 1024 * 1024 * 1024,
  806. Sources: sources,
  807. Destinations: destinations,
  808. })
  809. assert.NoError(t, err, "Should add EC task successfully")
  810. // Test the new detailed capacity function
  811. detailedCapacity := activeTopology.GetEffectiveAvailableCapacityDetailed("10.0.0.1:8080", 0)
  812. simpleCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
  813. // The simple capacity should match the volume slots from detailed capacity
  814. assert.Equal(t, int64(detailedCapacity.VolumeSlots), simpleCapacity, "Simple capacity should match detailed volume slots")
  815. // Verify detailed capacity has both volume and shard information
  816. assert.Equal(t, int32(80), detailedCapacity.VolumeSlots, "Should have 80 available volume slots (100 - 20 current, no volume impact from EC)")
  817. assert.Equal(t, int32(-5), detailedCapacity.ShardSlots, "Should show -5 available shard slots (5 destination shards)")
  818. // Verify capacity impact
  819. capacityImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.1:8080", 0)
  820. assert.Equal(t, int32(0), capacityImpact.VolumeSlots, "EC source should have zero volume slot impact")
  821. assert.Equal(t, int32(5), capacityImpact.ShardSlots, "Should have positive shard slot impact (consuming 5 shards)")
  822. t.Logf("Detailed capacity calculation: VolumeSlots=%d, ShardSlots=%d",
  823. detailedCapacity.VolumeSlots, detailedCapacity.ShardSlots)
  824. t.Logf("Capacity impact: VolumeSlots=%d, ShardSlots=%d",
  825. capacityImpact.VolumeSlots, capacityImpact.ShardSlots)
  826. t.Logf("Simple capacity (backward compatible): %d", simpleCapacity)
  827. }
  828. // TestStorageSlotChangeConversions tests the conversion and accommodation methods for StorageSlotChange
  829. // This test is designed to work with any value of erasure_coding.DataShardsCount, making it
  830. // compatible with custom erasure coding configurations.
  831. func TestStorageSlotChangeConversions(t *testing.T) {
  832. // Get the actual erasure coding constants for dynamic testing
  833. dataShards := int32(erasure_coding.DataShardsCount)
  834. // Test conversion constants
  835. assert.Equal(t, int(dataShards), ShardsPerVolumeSlot, fmt.Sprintf("Should use erasure_coding.DataShardsCount (%d) shards per volume slot", dataShards))
  836. // Test basic conversions using dynamic values
  837. volumeOnly := StorageSlotChange{VolumeSlots: 5, ShardSlots: 0}
  838. shardOnly := StorageSlotChange{VolumeSlots: 0, ShardSlots: 2 * dataShards} // 2 volume equivalents in shards
  839. mixed := StorageSlotChange{VolumeSlots: 2, ShardSlots: dataShards + 5} // 2 volumes + 1.5 volume equivalent in shards
  840. // Test ToVolumeSlots conversion - these should work regardless of DataShardsCount value
  841. assert.Equal(t, int64(5), volumeOnly.ToVolumeSlots(), "5 volume slots = 5 volume slots")
  842. assert.Equal(t, int64(2), shardOnly.ToVolumeSlots(), fmt.Sprintf("%d shard slots = 2 volume slots", 2*dataShards))
  843. expectedMixedVolumes := int64(2 + (dataShards+5)/dataShards) // 2 + floor((DataShardsCount+5)/DataShardsCount)
  844. assert.Equal(t, expectedMixedVolumes, mixed.ToVolumeSlots(), fmt.Sprintf("2 volume + %d shards = %d volume slots", dataShards+5, expectedMixedVolumes))
  845. // Test ToShardSlots conversion
  846. expectedVolumeShards := int32(5 * dataShards)
  847. assert.Equal(t, expectedVolumeShards, volumeOnly.ToShardSlots(), fmt.Sprintf("5 volume slots = %d shard slots", expectedVolumeShards))
  848. assert.Equal(t, 2*dataShards, shardOnly.ToShardSlots(), fmt.Sprintf("%d shard slots = %d shard slots", 2*dataShards, 2*dataShards))
  849. expectedMixedShards := int32(2*dataShards + dataShards + 5)
  850. assert.Equal(t, expectedMixedShards, mixed.ToShardSlots(), fmt.Sprintf("2 volume + %d shards = %d shard slots", dataShards+5, expectedMixedShards))
  851. // Test capacity accommodation checks using shard-based comparison
  852. availableVolumes := int32(10)
  853. available := StorageSlotChange{VolumeSlots: availableVolumes, ShardSlots: 0} // availableVolumes * dataShards shard slots available
  854. smallVolumeRequest := StorageSlotChange{VolumeSlots: 3, ShardSlots: 0} // Needs 3 * dataShards shard slots
  855. largeVolumeRequest := StorageSlotChange{VolumeSlots: availableVolumes + 5, ShardSlots: 0} // Needs more than available
  856. shardRequest := StorageSlotChange{VolumeSlots: 0, ShardSlots: 5 * dataShards} // Needs 5 volume equivalents in shards
  857. mixedRequest := StorageSlotChange{VolumeSlots: 8, ShardSlots: 3 * dataShards} // Needs 11 volume equivalents total
  858. smallShardsNeeded := 3 * dataShards
  859. availableShards := availableVolumes * dataShards
  860. largeShardsNeeded := (availableVolumes + 5) * dataShards
  861. shardShardsNeeded := 5 * dataShards
  862. mixedShardsNeeded := 8*dataShards + 3*dataShards
  863. assert.True(t, available.CanAccommodate(smallVolumeRequest), fmt.Sprintf("Should accommodate small volume request (%d <= %d shards)", smallShardsNeeded, availableShards))
  864. assert.False(t, available.CanAccommodate(largeVolumeRequest), fmt.Sprintf("Should NOT accommodate large volume request (%d > %d shards)", largeShardsNeeded, availableShards))
  865. assert.True(t, available.CanAccommodate(shardRequest), fmt.Sprintf("Should accommodate shard request (%d <= %d shards)", shardShardsNeeded, availableShards))
  866. assert.False(t, available.CanAccommodate(mixedRequest), fmt.Sprintf("Should NOT accommodate mixed request (%d > %d shards)", mixedShardsNeeded, availableShards))
  867. t.Logf("Conversion tests passed: %d shards = 1 volume slot", ShardsPerVolumeSlot)
  868. t.Logf("Mixed capacity (%d volumes + %d shards) = %d equivalent volume slots",
  869. mixed.VolumeSlots, mixed.ShardSlots, mixed.ToVolumeSlots())
  870. t.Logf("Available capacity (%d volumes) = %d total shard slots",
  871. available.VolumeSlots, available.ToShardSlots())
  872. t.Logf("NOTE: This test adapts automatically to erasure_coding.DataShardsCount = %d", erasure_coding.DataShardsCount)
  873. }