collection_management.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. package dash
  2. import (
  3. "context"
  4. "sort"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. )
  8. // GetClusterCollections retrieves cluster collections data
  9. func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) {
  10. var collections []CollectionInfo
  11. var totalVolumes int
  12. var totalEcVolumes int
  13. var totalFiles int64
  14. var totalSize int64
  15. collectionMap := make(map[string]*CollectionInfo)
  16. // Get actual collection information from volume data
  17. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  18. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  19. if err != nil {
  20. return err
  21. }
  22. if resp.TopologyInfo != nil {
  23. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  24. for _, rack := range dc.RackInfos {
  25. for _, node := range rack.DataNodeInfos {
  26. for _, diskInfo := range node.DiskInfos {
  27. // Process regular volumes
  28. for _, volInfo := range diskInfo.VolumeInfos {
  29. // Extract collection name from volume info
  30. collectionName := volInfo.Collection
  31. if collectionName == "" {
  32. collectionName = "default" // Default collection for volumes without explicit collection
  33. }
  34. // Get disk type from volume info, default to hdd if empty
  35. diskType := volInfo.DiskType
  36. if diskType == "" {
  37. diskType = "hdd"
  38. }
  39. // Get or create collection info
  40. if collection, exists := collectionMap[collectionName]; exists {
  41. collection.VolumeCount++
  42. collection.FileCount += int64(volInfo.FileCount)
  43. collection.TotalSize += int64(volInfo.Size)
  44. // Update data center if this collection spans multiple DCs
  45. if collection.DataCenter != dc.Id && collection.DataCenter != "multi" {
  46. collection.DataCenter = "multi"
  47. }
  48. // Add disk type if not already present
  49. diskTypeExists := false
  50. for _, existingDiskType := range collection.DiskTypes {
  51. if existingDiskType == diskType {
  52. diskTypeExists = true
  53. break
  54. }
  55. }
  56. if !diskTypeExists {
  57. collection.DiskTypes = append(collection.DiskTypes, diskType)
  58. }
  59. totalVolumes++
  60. totalFiles += int64(volInfo.FileCount)
  61. totalSize += int64(volInfo.Size)
  62. } else {
  63. newCollection := CollectionInfo{
  64. Name: collectionName,
  65. DataCenter: dc.Id,
  66. VolumeCount: 1,
  67. EcVolumeCount: 0,
  68. FileCount: int64(volInfo.FileCount),
  69. TotalSize: int64(volInfo.Size),
  70. DiskTypes: []string{diskType},
  71. }
  72. collectionMap[collectionName] = &newCollection
  73. totalVolumes++
  74. totalFiles += int64(volInfo.FileCount)
  75. totalSize += int64(volInfo.Size)
  76. }
  77. }
  78. // Process EC volumes
  79. ecVolumeMap := make(map[uint32]bool) // Track unique EC volumes to avoid double counting
  80. for _, ecShardInfo := range diskInfo.EcShardInfos {
  81. // Extract collection name from EC shard info
  82. collectionName := ecShardInfo.Collection
  83. if collectionName == "" {
  84. collectionName = "default" // Default collection for EC volumes without explicit collection
  85. }
  86. // Only count each EC volume once (not per shard)
  87. if !ecVolumeMap[ecShardInfo.Id] {
  88. ecVolumeMap[ecShardInfo.Id] = true
  89. // Get disk type from disk info, default to hdd if empty
  90. diskType := diskInfo.Type
  91. if diskType == "" {
  92. diskType = "hdd"
  93. }
  94. // Get or create collection info
  95. if collection, exists := collectionMap[collectionName]; exists {
  96. collection.EcVolumeCount++
  97. // Update data center if this collection spans multiple DCs
  98. if collection.DataCenter != dc.Id && collection.DataCenter != "multi" {
  99. collection.DataCenter = "multi"
  100. }
  101. // Add disk type if not already present
  102. diskTypeExists := false
  103. for _, existingDiskType := range collection.DiskTypes {
  104. if existingDiskType == diskType {
  105. diskTypeExists = true
  106. break
  107. }
  108. }
  109. if !diskTypeExists {
  110. collection.DiskTypes = append(collection.DiskTypes, diskType)
  111. }
  112. totalEcVolumes++
  113. } else {
  114. newCollection := CollectionInfo{
  115. Name: collectionName,
  116. DataCenter: dc.Id,
  117. VolumeCount: 0,
  118. EcVolumeCount: 1,
  119. FileCount: 0,
  120. TotalSize: 0,
  121. DiskTypes: []string{diskType},
  122. }
  123. collectionMap[collectionName] = &newCollection
  124. totalEcVolumes++
  125. }
  126. }
  127. }
  128. }
  129. }
  130. }
  131. }
  132. }
  133. return nil
  134. })
  135. if err != nil {
  136. return nil, err
  137. }
  138. // Convert map to slice
  139. for _, collection := range collectionMap {
  140. collections = append(collections, *collection)
  141. }
  142. // Sort collections alphabetically by name
  143. sort.Slice(collections, func(i, j int) bool {
  144. return collections[i].Name < collections[j].Name
  145. })
  146. // If no collections found, show a message indicating no collections exist
  147. if len(collections) == 0 {
  148. // Return empty collections data instead of creating fake ones
  149. return &ClusterCollectionsData{
  150. Collections: []CollectionInfo{},
  151. TotalCollections: 0,
  152. TotalVolumes: 0,
  153. TotalEcVolumes: 0,
  154. TotalFiles: 0,
  155. TotalSize: 0,
  156. LastUpdated: time.Now(),
  157. }, nil
  158. }
  159. return &ClusterCollectionsData{
  160. Collections: collections,
  161. TotalCollections: len(collections),
  162. TotalVolumes: totalVolumes,
  163. TotalEcVolumes: totalEcVolumes,
  164. TotalFiles: totalFiles,
  165. TotalSize: totalSize,
  166. LastUpdated: time.Now(),
  167. }, nil
  168. }
  169. // GetCollectionDetails retrieves detailed information for a specific collection including volumes and EC volumes
  170. func (s *AdminServer) GetCollectionDetails(collectionName string, page int, pageSize int, sortBy string, sortOrder string) (*CollectionDetailsData, error) {
  171. // Set defaults
  172. if page < 1 {
  173. page = 1
  174. }
  175. if pageSize < 1 || pageSize > 1000 {
  176. pageSize = 25
  177. }
  178. if sortBy == "" {
  179. sortBy = "volume_id"
  180. }
  181. if sortOrder == "" {
  182. sortOrder = "asc"
  183. }
  184. var regularVolumes []VolumeWithTopology
  185. var ecVolumes []EcVolumeWithShards
  186. var totalFiles int64
  187. var totalSize int64
  188. dataCenters := make(map[string]bool)
  189. diskTypes := make(map[string]bool)
  190. // Get regular volumes for this collection
  191. regularVolumeData, err := s.GetClusterVolumes(1, 10000, "volume_id", "asc", collectionName) // Get all volumes
  192. if err != nil {
  193. return nil, err
  194. }
  195. regularVolumes = regularVolumeData.Volumes
  196. totalSize = regularVolumeData.TotalSize
  197. // Calculate total files from regular volumes
  198. for _, vol := range regularVolumes {
  199. totalFiles += int64(vol.FileCount)
  200. }
  201. // Collect data centers and disk types from regular volumes
  202. for _, vol := range regularVolumes {
  203. dataCenters[vol.DataCenter] = true
  204. diskTypes[vol.DiskType] = true
  205. }
  206. // Get EC volumes for this collection
  207. ecVolumeData, err := s.GetClusterEcVolumes(1, 10000, "volume_id", "asc", collectionName) // Get all EC volumes
  208. if err != nil {
  209. return nil, err
  210. }
  211. ecVolumes = ecVolumeData.EcVolumes
  212. // Collect data centers from EC volumes
  213. for _, ecVol := range ecVolumes {
  214. for _, dc := range ecVol.DataCenters {
  215. dataCenters[dc] = true
  216. }
  217. }
  218. // Combine all volumes for sorting and pagination
  219. type VolumeForSorting struct {
  220. Type string // "regular" or "ec"
  221. RegularVolume *VolumeWithTopology
  222. EcVolume *EcVolumeWithShards
  223. }
  224. var allVolumes []VolumeForSorting
  225. for i := range regularVolumes {
  226. allVolumes = append(allVolumes, VolumeForSorting{
  227. Type: "regular",
  228. RegularVolume: &regularVolumes[i],
  229. })
  230. }
  231. for i := range ecVolumes {
  232. allVolumes = append(allVolumes, VolumeForSorting{
  233. Type: "ec",
  234. EcVolume: &ecVolumes[i],
  235. })
  236. }
  237. // Sort all volumes
  238. sort.Slice(allVolumes, func(i, j int) bool {
  239. var less bool
  240. switch sortBy {
  241. case "volume_id":
  242. var idI, idJ uint32
  243. if allVolumes[i].Type == "regular" {
  244. idI = allVolumes[i].RegularVolume.Id
  245. } else {
  246. idI = allVolumes[i].EcVolume.VolumeID
  247. }
  248. if allVolumes[j].Type == "regular" {
  249. idJ = allVolumes[j].RegularVolume.Id
  250. } else {
  251. idJ = allVolumes[j].EcVolume.VolumeID
  252. }
  253. less = idI < idJ
  254. case "type":
  255. // Sort by type first (regular before ec), then by volume ID
  256. if allVolumes[i].Type == allVolumes[j].Type {
  257. var idI, idJ uint32
  258. if allVolumes[i].Type == "regular" {
  259. idI = allVolumes[i].RegularVolume.Id
  260. } else {
  261. idI = allVolumes[i].EcVolume.VolumeID
  262. }
  263. if allVolumes[j].Type == "regular" {
  264. idJ = allVolumes[j].RegularVolume.Id
  265. } else {
  266. idJ = allVolumes[j].EcVolume.VolumeID
  267. }
  268. less = idI < idJ
  269. } else {
  270. less = allVolumes[i].Type < allVolumes[j].Type // "ec" < "regular"
  271. }
  272. default:
  273. // Default to volume ID sort
  274. var idI, idJ uint32
  275. if allVolumes[i].Type == "regular" {
  276. idI = allVolumes[i].RegularVolume.Id
  277. } else {
  278. idI = allVolumes[i].EcVolume.VolumeID
  279. }
  280. if allVolumes[j].Type == "regular" {
  281. idJ = allVolumes[j].RegularVolume.Id
  282. } else {
  283. idJ = allVolumes[j].EcVolume.VolumeID
  284. }
  285. less = idI < idJ
  286. }
  287. if sortOrder == "desc" {
  288. return !less
  289. }
  290. return less
  291. })
  292. // Apply pagination
  293. totalVolumesAndEc := len(allVolumes)
  294. totalPages := (totalVolumesAndEc + pageSize - 1) / pageSize
  295. startIndex := (page - 1) * pageSize
  296. endIndex := startIndex + pageSize
  297. if endIndex > totalVolumesAndEc {
  298. endIndex = totalVolumesAndEc
  299. }
  300. if startIndex >= totalVolumesAndEc {
  301. startIndex = 0
  302. endIndex = 0
  303. }
  304. // Extract paginated results
  305. var paginatedRegularVolumes []VolumeWithTopology
  306. var paginatedEcVolumes []EcVolumeWithShards
  307. for i := startIndex; i < endIndex; i++ {
  308. if allVolumes[i].Type == "regular" {
  309. paginatedRegularVolumes = append(paginatedRegularVolumes, *allVolumes[i].RegularVolume)
  310. } else {
  311. paginatedEcVolumes = append(paginatedEcVolumes, *allVolumes[i].EcVolume)
  312. }
  313. }
  314. // Convert maps to slices
  315. var dcList []string
  316. for dc := range dataCenters {
  317. dcList = append(dcList, dc)
  318. }
  319. sort.Strings(dcList)
  320. var diskTypeList []string
  321. for diskType := range diskTypes {
  322. diskTypeList = append(diskTypeList, diskType)
  323. }
  324. sort.Strings(diskTypeList)
  325. return &CollectionDetailsData{
  326. CollectionName: collectionName,
  327. RegularVolumes: paginatedRegularVolumes,
  328. EcVolumes: paginatedEcVolumes,
  329. TotalVolumes: len(regularVolumes),
  330. TotalEcVolumes: len(ecVolumes),
  331. TotalFiles: totalFiles,
  332. TotalSize: totalSize,
  333. DataCenters: dcList,
  334. DiskTypes: diskTypeList,
  335. LastUpdated: time.Now(),
  336. Page: page,
  337. PageSize: pageSize,
  338. TotalPages: totalPages,
  339. SortBy: sortBy,
  340. SortOrder: sortOrder,
  341. }, nil
  342. }