volume_management.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. package dash
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  9. )
  10. // GetClusterVolumes retrieves cluster volumes data with pagination, sorting, and filtering
  11. func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterVolumesData, error) {
  12. // Set defaults
  13. if page < 1 {
  14. page = 1
  15. }
  16. if pageSize < 1 || pageSize > 1000 {
  17. pageSize = 100
  18. }
  19. if sortBy == "" {
  20. sortBy = "id"
  21. }
  22. if sortOrder == "" {
  23. sortOrder = "asc"
  24. }
  25. var volumes []VolumeWithTopology
  26. var totalSize int64
  27. var cachedTopologyInfo *master_pb.TopologyInfo
  28. // Get detailed volume information via gRPC
  29. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  30. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  31. if err != nil {
  32. return err
  33. }
  34. // Cache the topology info for reuse
  35. cachedTopologyInfo = resp.TopologyInfo
  36. if resp.TopologyInfo != nil {
  37. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  38. for _, rack := range dc.RackInfos {
  39. for _, node := range rack.DataNodeInfos {
  40. for _, diskInfo := range node.DiskInfos {
  41. // Process regular volumes
  42. for _, volInfo := range diskInfo.VolumeInfos {
  43. volume := VolumeWithTopology{
  44. VolumeInformationMessage: volInfo,
  45. Server: node.Id,
  46. DataCenter: dc.Id,
  47. Rack: rack.Id,
  48. }
  49. volumes = append(volumes, volume)
  50. totalSize += int64(volInfo.Size)
  51. }
  52. // Process EC shards in the same loop
  53. for _, ecShardInfo := range diskInfo.EcShardInfos {
  54. // Add all shard sizes for this EC volume
  55. for _, shardSize := range ecShardInfo.ShardSizes {
  56. totalSize += shardSize
  57. }
  58. }
  59. }
  60. }
  61. }
  62. }
  63. }
  64. return nil
  65. })
  66. if err != nil {
  67. return nil, err
  68. }
  69. // Filter by collection if specified
  70. if collection != "" {
  71. var filteredVolumes []VolumeWithTopology
  72. var filteredTotalSize int64
  73. var filteredEcTotalSize int64
  74. for _, volume := range volumes {
  75. if matchesCollection(volume.Collection, collection) {
  76. filteredVolumes = append(filteredVolumes, volume)
  77. filteredTotalSize += int64(volume.Size)
  78. }
  79. }
  80. // Filter EC shard sizes by collection using already processed data
  81. // This reuses the topology traversal done above (lines 43-71) to avoid a second pass
  82. if cachedTopologyInfo != nil {
  83. for _, dc := range cachedTopologyInfo.DataCenterInfos {
  84. for _, rack := range dc.RackInfos {
  85. for _, node := range rack.DataNodeInfos {
  86. for _, diskInfo := range node.DiskInfos {
  87. for _, ecShardInfo := range diskInfo.EcShardInfos {
  88. if matchesCollection(ecShardInfo.Collection, collection) {
  89. // Add all shard sizes for this EC volume
  90. for _, shardSize := range ecShardInfo.ShardSizes {
  91. filteredEcTotalSize += shardSize
  92. }
  93. }
  94. }
  95. }
  96. }
  97. }
  98. }
  99. }
  100. volumes = filteredVolumes
  101. totalSize = filteredTotalSize + filteredEcTotalSize
  102. }
  103. // Calculate unique data center, rack, disk type, collection, and version counts from filtered volumes
  104. dataCenterMap := make(map[string]bool)
  105. rackMap := make(map[string]bool)
  106. diskTypeMap := make(map[string]bool)
  107. collectionMap := make(map[string]bool)
  108. versionMap := make(map[string]bool)
  109. for _, volume := range volumes {
  110. if volume.DataCenter != "" {
  111. dataCenterMap[volume.DataCenter] = true
  112. }
  113. if volume.Rack != "" {
  114. rackMap[volume.Rack] = true
  115. }
  116. diskType := volume.DiskType
  117. if diskType == "" {
  118. diskType = "hdd" // Default to hdd if not specified
  119. }
  120. diskTypeMap[diskType] = true
  121. // Handle collection for display purposes
  122. collectionName := volume.Collection
  123. if collectionName == "" {
  124. collectionName = "default"
  125. }
  126. collectionMap[collectionName] = true
  127. versionMap[fmt.Sprintf("%d", volume.Version)] = true
  128. }
  129. dataCenterCount := len(dataCenterMap)
  130. rackCount := len(rackMap)
  131. diskTypeCount := len(diskTypeMap)
  132. collectionCount := len(collectionMap)
  133. versionCount := len(versionMap)
  134. // Sort volumes
  135. s.sortVolumes(volumes, sortBy, sortOrder)
  136. // Get volume size limit from master
  137. var volumeSizeLimit uint64
  138. err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  139. resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  140. if err != nil {
  141. return err
  142. }
  143. volumeSizeLimit = uint64(resp.VolumeSizeLimitMB) * 1024 * 1024 // Convert MB to bytes
  144. return nil
  145. })
  146. if err != nil {
  147. // If we can't get the limit, set a default
  148. volumeSizeLimit = 30 * 1024 * 1024 * 1024 // 30GB default
  149. }
  150. // Calculate pagination
  151. totalVolumes := len(volumes)
  152. totalPages := (totalVolumes + pageSize - 1) / pageSize
  153. if totalPages == 0 {
  154. totalPages = 1
  155. }
  156. // Apply pagination
  157. startIndex := (page - 1) * pageSize
  158. endIndex := startIndex + pageSize
  159. if startIndex >= totalVolumes {
  160. volumes = []VolumeWithTopology{}
  161. } else {
  162. if endIndex > totalVolumes {
  163. endIndex = totalVolumes
  164. }
  165. volumes = volumes[startIndex:endIndex]
  166. }
  167. // Determine conditional display flags and extract single values
  168. showDataCenterColumn := dataCenterCount > 1
  169. showRackColumn := rackCount > 1
  170. showDiskTypeColumn := diskTypeCount > 1
  171. showCollectionColumn := collectionCount > 1 && collection == "" // Hide column when filtering by collection
  172. showVersionColumn := versionCount > 1
  173. var singleDataCenter, singleRack, singleDiskType, singleCollection, singleVersion string
  174. var allVersions, allDiskTypes []string
  175. if dataCenterCount == 1 {
  176. for dc := range dataCenterMap {
  177. singleDataCenter = dc
  178. break
  179. }
  180. }
  181. if rackCount == 1 {
  182. for rack := range rackMap {
  183. singleRack = rack
  184. break
  185. }
  186. }
  187. if diskTypeCount == 1 {
  188. for diskType := range diskTypeMap {
  189. singleDiskType = diskType
  190. break
  191. }
  192. } else {
  193. // Collect all disk types and sort them
  194. for diskType := range diskTypeMap {
  195. allDiskTypes = append(allDiskTypes, diskType)
  196. }
  197. sort.Strings(allDiskTypes)
  198. }
  199. if collectionCount == 1 {
  200. for collection := range collectionMap {
  201. singleCollection = collection
  202. break
  203. }
  204. }
  205. if versionCount == 1 {
  206. for version := range versionMap {
  207. singleVersion = "v" + version
  208. break
  209. }
  210. } else {
  211. // Collect all versions and sort them
  212. for version := range versionMap {
  213. allVersions = append(allVersions, "v"+version)
  214. }
  215. sort.Strings(allVersions)
  216. }
  217. return &ClusterVolumesData{
  218. Volumes: volumes,
  219. TotalVolumes: totalVolumes,
  220. TotalSize: totalSize,
  221. VolumeSizeLimit: volumeSizeLimit,
  222. LastUpdated: time.Now(),
  223. CurrentPage: page,
  224. TotalPages: totalPages,
  225. PageSize: pageSize,
  226. SortBy: sortBy,
  227. SortOrder: sortOrder,
  228. DataCenterCount: dataCenterCount,
  229. RackCount: rackCount,
  230. DiskTypeCount: diskTypeCount,
  231. CollectionCount: collectionCount,
  232. VersionCount: versionCount,
  233. ShowDataCenterColumn: showDataCenterColumn,
  234. ShowRackColumn: showRackColumn,
  235. ShowDiskTypeColumn: showDiskTypeColumn,
  236. ShowCollectionColumn: showCollectionColumn,
  237. ShowVersionColumn: showVersionColumn,
  238. SingleDataCenter: singleDataCenter,
  239. SingleRack: singleRack,
  240. SingleDiskType: singleDiskType,
  241. SingleCollection: singleCollection,
  242. SingleVersion: singleVersion,
  243. AllVersions: allVersions,
  244. AllDiskTypes: allDiskTypes,
  245. FilterCollection: collection,
  246. }, nil
  247. }
  248. // sortVolumes sorts the volumes slice based on the specified field and order
  249. func (s *AdminServer) sortVolumes(volumes []VolumeWithTopology, sortBy string, sortOrder string) {
  250. sort.Slice(volumes, func(i, j int) bool {
  251. var less bool
  252. switch sortBy {
  253. case "id":
  254. less = volumes[i].Id < volumes[j].Id
  255. case "server":
  256. less = volumes[i].Server < volumes[j].Server
  257. case "datacenter":
  258. less = volumes[i].DataCenter < volumes[j].DataCenter
  259. case "rack":
  260. less = volumes[i].Rack < volumes[j].Rack
  261. case "collection":
  262. less = volumes[i].Collection < volumes[j].Collection
  263. case "size":
  264. less = volumes[i].Size < volumes[j].Size
  265. case "filecount":
  266. less = volumes[i].FileCount < volumes[j].FileCount
  267. case "replication":
  268. less = volumes[i].ReplicaPlacement < volumes[j].ReplicaPlacement
  269. case "disktype":
  270. less = volumes[i].DiskType < volumes[j].DiskType
  271. case "version":
  272. less = volumes[i].Version < volumes[j].Version
  273. default:
  274. less = volumes[i].Id < volumes[j].Id
  275. }
  276. if sortOrder == "desc" {
  277. return !less
  278. }
  279. return less
  280. })
  281. }
  282. // GetVolumeDetails retrieves detailed information about a specific volume
  283. func (s *AdminServer) GetVolumeDetails(volumeID int, server string) (*VolumeDetailsData, error) {
  284. var primaryVolume VolumeWithTopology
  285. var replicas []VolumeWithTopology
  286. var volumeSizeLimit uint64
  287. var found bool
  288. // Find the volume and all its replicas in the cluster
  289. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  290. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  291. if err != nil {
  292. return err
  293. }
  294. if resp.TopologyInfo != nil {
  295. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  296. for _, rack := range dc.RackInfos {
  297. for _, node := range rack.DataNodeInfos {
  298. for _, diskInfo := range node.DiskInfos {
  299. for _, volInfo := range diskInfo.VolumeInfos {
  300. if int(volInfo.Id) == volumeID {
  301. diskType := volInfo.DiskType
  302. if diskType == "" {
  303. diskType = "hdd"
  304. }
  305. volume := VolumeWithTopology{
  306. VolumeInformationMessage: volInfo,
  307. Server: node.Id,
  308. DataCenter: dc.Id,
  309. Rack: rack.Id,
  310. }
  311. // If this is the requested server, it's the primary volume
  312. if node.Id == server {
  313. primaryVolume = volume
  314. found = true
  315. } else {
  316. // This is a replica on another server
  317. replicas = append(replicas, volume)
  318. }
  319. }
  320. }
  321. }
  322. }
  323. }
  324. }
  325. }
  326. return nil
  327. })
  328. if err != nil {
  329. return nil, err
  330. }
  331. if !found {
  332. return nil, fmt.Errorf("volume %d not found on server %s", volumeID, server)
  333. }
  334. // Get volume size limit from master
  335. err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  336. resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  337. if err != nil {
  338. return err
  339. }
  340. volumeSizeLimit = uint64(resp.VolumeSizeLimitMB) * 1024 * 1024 // Convert MB to bytes
  341. return nil
  342. })
  343. if err != nil {
  344. // If we can't get the limit, set a default
  345. volumeSizeLimit = 30 * 1024 * 1024 * 1024 // 30GB default
  346. }
  347. return &VolumeDetailsData{
  348. Volume: primaryVolume,
  349. Replicas: replicas,
  350. VolumeSizeLimit: volumeSizeLimit,
  351. ReplicationCount: len(replicas) + 1, // Include the primary volume
  352. LastUpdated: time.Now(),
  353. }, nil
  354. }
  355. // VacuumVolume performs a vacuum operation on a specific volume
  356. func (s *AdminServer) VacuumVolume(volumeID int, server string) error {
  357. return s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  358. _, err := client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
  359. VolumeId: uint32(volumeID),
  360. GarbageThreshold: 0.0001, // A very low threshold to ensure all garbage is collected
  361. Collection: "", // Empty for all collections
  362. })
  363. return err
  364. })
  365. }
  366. // GetClusterVolumeServers retrieves cluster volume servers data including EC shard information
  367. func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, error) {
  368. var volumeServerMap map[string]*VolumeServer
  369. // Make only ONE VolumeList call and use it for both topology building AND EC shard processing
  370. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  371. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  372. if err != nil {
  373. return err
  374. }
  375. // Get volume size limit from response, default to 30GB if not set
  376. volumeSizeLimitMB := resp.VolumeSizeLimitMb
  377. if volumeSizeLimitMB == 0 {
  378. volumeSizeLimitMB = 30000 // default to 30000MB (30GB)
  379. }
  380. // Build basic topology from the VolumeList response (replaces GetClusterTopology call)
  381. volumeServerMap = make(map[string]*VolumeServer)
  382. if resp.TopologyInfo != nil {
  383. // Process topology to build basic volume server info (similar to cluster_topology.go logic)
  384. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  385. for _, rack := range dc.RackInfos {
  386. for _, node := range rack.DataNodeInfos {
  387. // Initialize volume server if not exists
  388. if volumeServerMap[node.Id] == nil {
  389. volumeServerMap[node.Id] = &VolumeServer{
  390. Address: node.Id,
  391. DataCenter: dc.Id,
  392. Rack: rack.Id,
  393. Volumes: 0,
  394. DiskUsage: 0,
  395. DiskCapacity: 0,
  396. EcVolumes: 0,
  397. EcShards: 0,
  398. EcShardDetails: []VolumeServerEcInfo{},
  399. }
  400. }
  401. vs := volumeServerMap[node.Id]
  402. // Process EC shard information for this server at volume server level (not per-disk)
  403. ecVolumeMap := make(map[uint32]*VolumeServerEcInfo)
  404. // Temporary map to accumulate shard info across disks
  405. ecShardAccumulator := make(map[uint32][]*master_pb.VolumeEcShardInformationMessage)
  406. // Process disk information
  407. for _, diskInfo := range node.DiskInfos {
  408. vs.DiskCapacity += int64(diskInfo.MaxVolumeCount) * int64(volumeSizeLimitMB) * 1024 * 1024 // Use actual volume size limit
  409. // Count regular volumes and calculate disk usage
  410. for _, volInfo := range diskInfo.VolumeInfos {
  411. vs.Volumes++
  412. vs.DiskUsage += int64(volInfo.Size)
  413. }
  414. // Accumulate EC shard information across all disks for this volume server
  415. for _, ecShardInfo := range diskInfo.EcShardInfos {
  416. volumeId := ecShardInfo.Id
  417. ecShardAccumulator[volumeId] = append(ecShardAccumulator[volumeId], ecShardInfo)
  418. }
  419. }
  420. // Process accumulated EC shard information per volume
  421. for volumeId, ecShardInfos := range ecShardAccumulator {
  422. if len(ecShardInfos) == 0 {
  423. continue
  424. }
  425. // Initialize EC volume info
  426. ecInfo := &VolumeServerEcInfo{
  427. VolumeID: volumeId,
  428. Collection: ecShardInfos[0].Collection,
  429. ShardCount: 0,
  430. EcIndexBits: 0,
  431. ShardNumbers: []int{},
  432. ShardSizes: make(map[int]int64),
  433. TotalSize: 0,
  434. }
  435. // Merge EcIndexBits from all disks and collect shard sizes
  436. allShardSizes := make(map[erasure_coding.ShardId]int64)
  437. for _, ecShardInfo := range ecShardInfos {
  438. ecInfo.EcIndexBits |= ecShardInfo.EcIndexBits
  439. // Collect shard sizes from this disk
  440. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  441. shardBits.EachSetIndex(func(shardId erasure_coding.ShardId) {
  442. if size, found := erasure_coding.GetShardSize(ecShardInfo, shardId); found {
  443. allShardSizes[shardId] = size
  444. }
  445. })
  446. }
  447. // Process final merged shard information
  448. finalShardBits := erasure_coding.ShardBits(ecInfo.EcIndexBits)
  449. finalShardBits.EachSetIndex(func(shardId erasure_coding.ShardId) {
  450. ecInfo.ShardCount++
  451. ecInfo.ShardNumbers = append(ecInfo.ShardNumbers, int(shardId))
  452. vs.EcShards++
  453. // Add shard size if available
  454. if shardSize, exists := allShardSizes[shardId]; exists {
  455. ecInfo.ShardSizes[int(shardId)] = shardSize
  456. ecInfo.TotalSize += shardSize
  457. vs.DiskUsage += shardSize // Add EC shard size to total disk usage
  458. }
  459. })
  460. ecVolumeMap[volumeId] = ecInfo
  461. }
  462. // Convert EC volume map to slice and update volume server (after processing all disks)
  463. for _, ecInfo := range ecVolumeMap {
  464. vs.EcShardDetails = append(vs.EcShardDetails, *ecInfo)
  465. vs.EcVolumes++
  466. }
  467. }
  468. }
  469. }
  470. }
  471. return nil
  472. })
  473. if err != nil {
  474. return nil, err
  475. }
  476. // Convert map back to slice
  477. var volumeServers []VolumeServer
  478. for _, vs := range volumeServerMap {
  479. volumeServers = append(volumeServers, *vs)
  480. }
  481. var totalCapacity int64
  482. var totalVolumes int
  483. for _, vs := range volumeServers {
  484. totalCapacity += vs.DiskCapacity
  485. totalVolumes += vs.Volumes
  486. }
  487. return &ClusterVolumeServersData{
  488. VolumeServers: volumeServers,
  489. TotalVolumeServers: len(volumeServers),
  490. TotalVolumes: totalVolumes,
  491. TotalCapacity: totalCapacity,
  492. LastUpdated: time.Now(),
  493. }, nil
  494. }