ec_shard_management.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. package dash
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  12. )
  13. // matchesCollection checks if a volume/EC volume collection matches the filter collection.
  14. // Handles the special case where empty collection ("") represents the "default" collection.
  15. func matchesCollection(volumeCollection, filterCollection string) bool {
  16. // Both empty means default collection matches default filter
  17. if volumeCollection == "" && filterCollection == "" {
  18. return true
  19. }
  20. // Direct string match for named collections
  21. return volumeCollection == filterCollection
  22. }
  23. // GetClusterEcShards retrieves cluster EC shards data with pagination, sorting, and filtering
  24. func (s *AdminServer) GetClusterEcShards(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcShardsData, error) {
  25. // Set defaults
  26. if page < 1 {
  27. page = 1
  28. }
  29. if pageSize < 1 || pageSize > 1000 {
  30. pageSize = 100
  31. }
  32. if sortBy == "" {
  33. sortBy = "volume_id"
  34. }
  35. if sortOrder == "" {
  36. sortOrder = "asc"
  37. }
  38. var ecShards []EcShardWithInfo
  39. volumeShardsMap := make(map[uint32]map[int]bool) // volumeId -> set of shards present
  40. volumesWithAllShards := 0
  41. volumesWithMissingShards := 0
  42. // Get detailed EC shard information via gRPC
  43. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  44. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  45. if err != nil {
  46. return err
  47. }
  48. if resp.TopologyInfo != nil {
  49. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  50. for _, rack := range dc.RackInfos {
  51. for _, node := range rack.DataNodeInfos {
  52. for _, diskInfo := range node.DiskInfos {
  53. // Process EC shard information
  54. for _, ecShardInfo := range diskInfo.EcShardInfos {
  55. volumeId := ecShardInfo.Id
  56. // Initialize volume shards map if needed
  57. if volumeShardsMap[volumeId] == nil {
  58. volumeShardsMap[volumeId] = make(map[int]bool)
  59. }
  60. // Create individual shard entries for each shard this server has
  61. shardBits := ecShardInfo.EcIndexBits
  62. for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
  63. if (shardBits & (1 << uint(shardId))) != 0 {
  64. // Mark this shard as present for this volume
  65. volumeShardsMap[volumeId][shardId] = true
  66. ecShard := EcShardWithInfo{
  67. VolumeID: volumeId,
  68. ShardID: uint32(shardId),
  69. Collection: ecShardInfo.Collection,
  70. Size: 0, // EC shards don't have individual size in the API response
  71. Server: node.Id,
  72. DataCenter: dc.Id,
  73. Rack: rack.Id,
  74. DiskType: diskInfo.Type,
  75. ModifiedTime: 0, // Not available in current API
  76. EcIndexBits: ecShardInfo.EcIndexBits,
  77. ShardCount: getShardCount(ecShardInfo.EcIndexBits),
  78. }
  79. ecShards = append(ecShards, ecShard)
  80. }
  81. }
  82. }
  83. }
  84. }
  85. }
  86. }
  87. }
  88. return nil
  89. })
  90. if err != nil {
  91. return nil, err
  92. }
  93. // Calculate volume-level completeness (across all servers)
  94. volumeCompleteness := make(map[uint32]bool)
  95. volumeMissingShards := make(map[uint32][]int)
  96. for volumeId, shardsPresent := range volumeShardsMap {
  97. var missingShards []int
  98. shardCount := len(shardsPresent)
  99. // Find which shards are missing for this volume across ALL servers
  100. for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
  101. if !shardsPresent[shardId] {
  102. missingShards = append(missingShards, shardId)
  103. }
  104. }
  105. isComplete := (shardCount == erasure_coding.TotalShardsCount)
  106. volumeCompleteness[volumeId] = isComplete
  107. volumeMissingShards[volumeId] = missingShards
  108. if isComplete {
  109. volumesWithAllShards++
  110. } else {
  111. volumesWithMissingShards++
  112. }
  113. }
  114. // Update completeness info for each shard based on volume-level completeness
  115. for i := range ecShards {
  116. volumeId := ecShards[i].VolumeID
  117. ecShards[i].IsComplete = volumeCompleteness[volumeId]
  118. ecShards[i].MissingShards = volumeMissingShards[volumeId]
  119. }
  120. // Filter by collection if specified
  121. if collection != "" {
  122. var filteredShards []EcShardWithInfo
  123. for _, shard := range ecShards {
  124. if shard.Collection == collection {
  125. filteredShards = append(filteredShards, shard)
  126. }
  127. }
  128. ecShards = filteredShards
  129. }
  130. // Sort the results
  131. sortEcShards(ecShards, sortBy, sortOrder)
  132. // Calculate statistics for conditional display
  133. dataCenters := make(map[string]bool)
  134. racks := make(map[string]bool)
  135. collections := make(map[string]bool)
  136. for _, shard := range ecShards {
  137. dataCenters[shard.DataCenter] = true
  138. racks[shard.Rack] = true
  139. if shard.Collection != "" {
  140. collections[shard.Collection] = true
  141. }
  142. }
  143. // Pagination
  144. totalShards := len(ecShards)
  145. totalPages := (totalShards + pageSize - 1) / pageSize
  146. startIndex := (page - 1) * pageSize
  147. endIndex := startIndex + pageSize
  148. if endIndex > totalShards {
  149. endIndex = totalShards
  150. }
  151. if startIndex >= totalShards {
  152. startIndex = 0
  153. endIndex = 0
  154. }
  155. paginatedShards := ecShards[startIndex:endIndex]
  156. // Build response
  157. data := &ClusterEcShardsData{
  158. EcShards: paginatedShards,
  159. TotalShards: totalShards,
  160. TotalVolumes: len(volumeShardsMap),
  161. LastUpdated: time.Now(),
  162. // Pagination
  163. CurrentPage: page,
  164. TotalPages: totalPages,
  165. PageSize: pageSize,
  166. // Sorting
  167. SortBy: sortBy,
  168. SortOrder: sortOrder,
  169. // Statistics
  170. DataCenterCount: len(dataCenters),
  171. RackCount: len(racks),
  172. CollectionCount: len(collections),
  173. // Conditional display flags
  174. ShowDataCenterColumn: len(dataCenters) > 1,
  175. ShowRackColumn: len(racks) > 1,
  176. ShowCollectionColumn: len(collections) > 1 || collection != "",
  177. // Filtering
  178. FilterCollection: collection,
  179. // EC specific statistics
  180. ShardsPerVolume: make(map[uint32]int), // This will be recalculated below
  181. VolumesWithAllShards: volumesWithAllShards,
  182. VolumesWithMissingShards: volumesWithMissingShards,
  183. }
  184. // Recalculate ShardsPerVolume for the response
  185. for volumeId, shardsPresent := range volumeShardsMap {
  186. data.ShardsPerVolume[volumeId] = len(shardsPresent)
  187. }
  188. // Set single values when only one exists
  189. if len(dataCenters) == 1 {
  190. for dc := range dataCenters {
  191. data.SingleDataCenter = dc
  192. break
  193. }
  194. }
  195. if len(racks) == 1 {
  196. for rack := range racks {
  197. data.SingleRack = rack
  198. break
  199. }
  200. }
  201. if len(collections) == 1 {
  202. for col := range collections {
  203. data.SingleCollection = col
  204. break
  205. }
  206. }
  207. return data, nil
  208. }
  209. // GetClusterEcVolumes retrieves cluster EC volumes data grouped by volume ID with shard locations
  210. func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcVolumesData, error) {
  211. // Set defaults
  212. if page < 1 {
  213. page = 1
  214. }
  215. if pageSize < 1 || pageSize > 1000 {
  216. pageSize = 100
  217. }
  218. if sortBy == "" {
  219. sortBy = "volume_id"
  220. }
  221. if sortOrder == "" {
  222. sortOrder = "asc"
  223. }
  224. volumeData := make(map[uint32]*EcVolumeWithShards)
  225. totalShards := 0
  226. // Get detailed EC shard information via gRPC
  227. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  228. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  229. if err != nil {
  230. return err
  231. }
  232. if resp.TopologyInfo != nil {
  233. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  234. for _, rack := range dc.RackInfos {
  235. for _, node := range rack.DataNodeInfos {
  236. for _, diskInfo := range node.DiskInfos {
  237. // Process EC shard information
  238. for _, ecShardInfo := range diskInfo.EcShardInfos {
  239. volumeId := ecShardInfo.Id
  240. // Initialize volume data if needed
  241. if volumeData[volumeId] == nil {
  242. volumeData[volumeId] = &EcVolumeWithShards{
  243. VolumeID: volumeId,
  244. Collection: ecShardInfo.Collection,
  245. TotalShards: 0,
  246. IsComplete: false,
  247. MissingShards: []int{},
  248. ShardLocations: make(map[int]string),
  249. ShardSizes: make(map[int]int64),
  250. DataCenters: []string{},
  251. Servers: []string{},
  252. Racks: []string{},
  253. }
  254. }
  255. volume := volumeData[volumeId]
  256. // Track data centers and servers
  257. dcExists := false
  258. for _, existingDc := range volume.DataCenters {
  259. if existingDc == dc.Id {
  260. dcExists = true
  261. break
  262. }
  263. }
  264. if !dcExists {
  265. volume.DataCenters = append(volume.DataCenters, dc.Id)
  266. }
  267. serverExists := false
  268. for _, existingServer := range volume.Servers {
  269. if existingServer == node.Id {
  270. serverExists = true
  271. break
  272. }
  273. }
  274. if !serverExists {
  275. volume.Servers = append(volume.Servers, node.Id)
  276. }
  277. // Track racks
  278. rackExists := false
  279. for _, existingRack := range volume.Racks {
  280. if existingRack == rack.Id {
  281. rackExists = true
  282. break
  283. }
  284. }
  285. if !rackExists {
  286. volume.Racks = append(volume.Racks, rack.Id)
  287. }
  288. // Process each shard this server has for this volume
  289. shardBits := ecShardInfo.EcIndexBits
  290. for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
  291. if (shardBits & (1 << uint(shardId))) != 0 {
  292. // Record shard location
  293. volume.ShardLocations[shardId] = node.Id
  294. totalShards++
  295. }
  296. }
  297. }
  298. }
  299. }
  300. }
  301. }
  302. }
  303. return nil
  304. })
  305. if err != nil {
  306. return nil, err
  307. }
  308. // Collect shard size information from volume servers
  309. for volumeId, volume := range volumeData {
  310. // Group servers by volume to minimize gRPC calls
  311. serverHasVolume := make(map[string]bool)
  312. for _, server := range volume.Servers {
  313. serverHasVolume[server] = true
  314. }
  315. // Query each server for shard sizes
  316. for server := range serverHasVolume {
  317. err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
  318. resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
  319. VolumeId: volumeId,
  320. })
  321. if err != nil {
  322. glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeId, err)
  323. return nil // Continue with other servers, don't fail the entire request
  324. }
  325. // Update shard sizes
  326. for _, shardInfo := range resp.EcShardInfos {
  327. volume.ShardSizes[int(shardInfo.ShardId)] = shardInfo.Size
  328. }
  329. return nil
  330. })
  331. if err != nil {
  332. glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err)
  333. }
  334. }
  335. }
  336. // Calculate completeness for each volume
  337. completeVolumes := 0
  338. incompleteVolumes := 0
  339. for _, volume := range volumeData {
  340. volume.TotalShards = len(volume.ShardLocations)
  341. // Find missing shards
  342. var missingShards []int
  343. for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
  344. if _, exists := volume.ShardLocations[shardId]; !exists {
  345. missingShards = append(missingShards, shardId)
  346. }
  347. }
  348. volume.MissingShards = missingShards
  349. volume.IsComplete = (len(missingShards) == 0)
  350. if volume.IsComplete {
  351. completeVolumes++
  352. } else {
  353. incompleteVolumes++
  354. }
  355. }
  356. // Convert map to slice
  357. var ecVolumes []EcVolumeWithShards
  358. for _, volume := range volumeData {
  359. // Filter by collection if specified
  360. if collection == "" || matchesCollection(volume.Collection, collection) {
  361. ecVolumes = append(ecVolumes, *volume)
  362. }
  363. }
  364. // Sort the results
  365. sortEcVolumes(ecVolumes, sortBy, sortOrder)
  366. // Calculate statistics for conditional display
  367. dataCenters := make(map[string]bool)
  368. collections := make(map[string]bool)
  369. for _, volume := range ecVolumes {
  370. for _, dc := range volume.DataCenters {
  371. dataCenters[dc] = true
  372. }
  373. if volume.Collection != "" {
  374. collections[volume.Collection] = true
  375. }
  376. }
  377. // Pagination
  378. totalVolumes := len(ecVolumes)
  379. totalPages := (totalVolumes + pageSize - 1) / pageSize
  380. startIndex := (page - 1) * pageSize
  381. endIndex := startIndex + pageSize
  382. if endIndex > totalVolumes {
  383. endIndex = totalVolumes
  384. }
  385. if startIndex >= totalVolumes {
  386. startIndex = 0
  387. endIndex = 0
  388. }
  389. paginatedVolumes := ecVolumes[startIndex:endIndex]
  390. // Build response
  391. data := &ClusterEcVolumesData{
  392. EcVolumes: paginatedVolumes,
  393. TotalVolumes: totalVolumes,
  394. LastUpdated: time.Now(),
  395. // Pagination
  396. Page: page,
  397. PageSize: pageSize,
  398. TotalPages: totalPages,
  399. // Sorting
  400. SortBy: sortBy,
  401. SortOrder: sortOrder,
  402. // Filtering
  403. Collection: collection,
  404. // Conditional display flags
  405. ShowDataCenterColumn: len(dataCenters) > 1,
  406. ShowRackColumn: false, // We don't track racks in this view for simplicity
  407. ShowCollectionColumn: len(collections) > 1 || collection != "",
  408. // Statistics
  409. CompleteVolumes: completeVolumes,
  410. IncompleteVolumes: incompleteVolumes,
  411. TotalShards: totalShards,
  412. }
  413. return data, nil
  414. }
  415. // sortEcVolumes sorts EC volumes based on the specified field and order
  416. func sortEcVolumes(volumes []EcVolumeWithShards, sortBy string, sortOrder string) {
  417. sort.Slice(volumes, func(i, j int) bool {
  418. var less bool
  419. switch sortBy {
  420. case "volume_id":
  421. less = volumes[i].VolumeID < volumes[j].VolumeID
  422. case "collection":
  423. if volumes[i].Collection == volumes[j].Collection {
  424. less = volumes[i].VolumeID < volumes[j].VolumeID
  425. } else {
  426. less = volumes[i].Collection < volumes[j].Collection
  427. }
  428. case "total_shards":
  429. if volumes[i].TotalShards == volumes[j].TotalShards {
  430. less = volumes[i].VolumeID < volumes[j].VolumeID
  431. } else {
  432. less = volumes[i].TotalShards < volumes[j].TotalShards
  433. }
  434. case "completeness":
  435. // Complete volumes first, then by volume ID
  436. if volumes[i].IsComplete == volumes[j].IsComplete {
  437. less = volumes[i].VolumeID < volumes[j].VolumeID
  438. } else {
  439. less = volumes[i].IsComplete && !volumes[j].IsComplete
  440. }
  441. default:
  442. less = volumes[i].VolumeID < volumes[j].VolumeID
  443. }
  444. if sortOrder == "desc" {
  445. return !less
  446. }
  447. return less
  448. })
  449. }
  450. // getShardCount returns the number of shards represented by the bitmap
  451. func getShardCount(ecIndexBits uint32) int {
  452. count := 0
  453. for i := 0; i < erasure_coding.TotalShardsCount; i++ {
  454. if (ecIndexBits & (1 << uint(i))) != 0 {
  455. count++
  456. }
  457. }
  458. return count
  459. }
  460. // getMissingShards returns a slice of missing shard IDs for a volume
  461. func getMissingShards(ecIndexBits uint32) []int {
  462. var missing []int
  463. for i := 0; i < erasure_coding.TotalShardsCount; i++ {
  464. if (ecIndexBits & (1 << uint(i))) == 0 {
  465. missing = append(missing, i)
  466. }
  467. }
  468. return missing
  469. }
  470. // sortEcShards sorts EC shards based on the specified field and order
  471. func sortEcShards(shards []EcShardWithInfo, sortBy string, sortOrder string) {
  472. sort.Slice(shards, func(i, j int) bool {
  473. var less bool
  474. switch sortBy {
  475. case "shard_id":
  476. less = shards[i].ShardID < shards[j].ShardID
  477. case "server":
  478. if shards[i].Server == shards[j].Server {
  479. less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID
  480. } else {
  481. less = shards[i].Server < shards[j].Server
  482. }
  483. case "data_center":
  484. if shards[i].DataCenter == shards[j].DataCenter {
  485. less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID
  486. } else {
  487. less = shards[i].DataCenter < shards[j].DataCenter
  488. }
  489. case "rack":
  490. if shards[i].Rack == shards[j].Rack {
  491. less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID
  492. } else {
  493. less = shards[i].Rack < shards[j].Rack
  494. }
  495. default:
  496. less = shards[i].ShardID < shards[j].ShardID
  497. }
  498. if sortOrder == "desc" {
  499. return !less
  500. }
  501. return less
  502. })
  503. }
  504. // GetEcVolumeDetails retrieves detailed information about a specific EC volume
  505. func (s *AdminServer) GetEcVolumeDetails(volumeID uint32, sortBy string, sortOrder string) (*EcVolumeDetailsData, error) {
  506. // Set defaults
  507. if sortBy == "" {
  508. sortBy = "shard_id"
  509. }
  510. if sortOrder == "" {
  511. sortOrder = "asc"
  512. }
  513. var shards []EcShardWithInfo
  514. var collection string
  515. dataCenters := make(map[string]bool)
  516. servers := make(map[string]bool)
  517. // Get detailed EC shard information for the specific volume via gRPC
  518. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  519. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  520. if err != nil {
  521. return err
  522. }
  523. if resp.TopologyInfo != nil {
  524. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  525. for _, rack := range dc.RackInfos {
  526. for _, node := range rack.DataNodeInfos {
  527. for _, diskInfo := range node.DiskInfos {
  528. // Process EC shard information for this specific volume
  529. for _, ecShardInfo := range diskInfo.EcShardInfos {
  530. if ecShardInfo.Id == volumeID {
  531. collection = ecShardInfo.Collection
  532. dataCenters[dc.Id] = true
  533. servers[node.Id] = true
  534. // Create individual shard entries for each shard this server has
  535. shardBits := ecShardInfo.EcIndexBits
  536. for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ {
  537. if (shardBits & (1 << uint(shardId))) != 0 {
  538. ecShard := EcShardWithInfo{
  539. VolumeID: ecShardInfo.Id,
  540. ShardID: uint32(shardId),
  541. Collection: ecShardInfo.Collection,
  542. Size: 0, // EC shards don't have individual size in the API response
  543. Server: node.Id,
  544. DataCenter: dc.Id,
  545. Rack: rack.Id,
  546. DiskType: diskInfo.Type,
  547. ModifiedTime: 0, // Not available in current API
  548. EcIndexBits: ecShardInfo.EcIndexBits,
  549. ShardCount: getShardCount(ecShardInfo.EcIndexBits),
  550. }
  551. shards = append(shards, ecShard)
  552. }
  553. }
  554. }
  555. }
  556. }
  557. }
  558. }
  559. }
  560. }
  561. return nil
  562. })
  563. if err != nil {
  564. return nil, err
  565. }
  566. if len(shards) == 0 {
  567. return nil, fmt.Errorf("EC volume %d not found", volumeID)
  568. }
  569. // Collect shard size information from volume servers
  570. shardSizeMap := make(map[string]map[uint32]uint64) // server -> shardId -> size
  571. for _, shard := range shards {
  572. server := shard.Server
  573. if _, exists := shardSizeMap[server]; !exists {
  574. // Query this server for shard sizes
  575. err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
  576. resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
  577. VolumeId: volumeID,
  578. })
  579. if err != nil {
  580. glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeID, err)
  581. return nil // Continue with other servers, don't fail the entire request
  582. }
  583. // Store shard sizes for this server
  584. shardSizeMap[server] = make(map[uint32]uint64)
  585. for _, shardInfo := range resp.EcShardInfos {
  586. shardSizeMap[server][shardInfo.ShardId] = uint64(shardInfo.Size)
  587. }
  588. return nil
  589. })
  590. if err != nil {
  591. glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err)
  592. }
  593. }
  594. }
  595. // Update shard sizes in the shards array
  596. for i := range shards {
  597. server := shards[i].Server
  598. shardId := shards[i].ShardID
  599. if serverSizes, exists := shardSizeMap[server]; exists {
  600. if size, exists := serverSizes[shardId]; exists {
  601. shards[i].Size = size
  602. }
  603. }
  604. }
  605. // Calculate completeness based on unique shard IDs
  606. foundShards := make(map[int]bool)
  607. for _, shard := range shards {
  608. foundShards[int(shard.ShardID)] = true
  609. }
  610. totalUniqueShards := len(foundShards)
  611. isComplete := (totalUniqueShards == erasure_coding.TotalShardsCount)
  612. // Calculate missing shards
  613. var missingShards []int
  614. for i := 0; i < erasure_coding.TotalShardsCount; i++ {
  615. if !foundShards[i] {
  616. missingShards = append(missingShards, i)
  617. }
  618. }
  619. // Update completeness info for each shard
  620. for i := range shards {
  621. shards[i].IsComplete = isComplete
  622. shards[i].MissingShards = missingShards
  623. }
  624. // Sort shards based on parameters
  625. sortEcShards(shards, sortBy, sortOrder)
  626. // Convert maps to slices
  627. var dcList []string
  628. for dc := range dataCenters {
  629. dcList = append(dcList, dc)
  630. }
  631. var serverList []string
  632. for server := range servers {
  633. serverList = append(serverList, server)
  634. }
  635. data := &EcVolumeDetailsData{
  636. VolumeID: volumeID,
  637. Collection: collection,
  638. Shards: shards,
  639. TotalShards: totalUniqueShards,
  640. IsComplete: isComplete,
  641. MissingShards: missingShards,
  642. DataCenters: dcList,
  643. Servers: serverList,
  644. LastUpdated: time.Now(),
  645. SortBy: sortBy,
  646. SortOrder: sortOrder,
  647. }
  648. return data, nil
  649. }