| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554 |
- package dash
- import (
- "context"
- "fmt"
- "sort"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
- )
- // GetClusterVolumes retrieves cluster volumes data with pagination, sorting, and filtering
- func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterVolumesData, error) {
- // Set defaults
- if page < 1 {
- page = 1
- }
- if pageSize < 1 || pageSize > 1000 {
- pageSize = 100
- }
- if sortBy == "" {
- sortBy = "id"
- }
- if sortOrder == "" {
- sortOrder = "asc"
- }
- var volumes []VolumeWithTopology
- var totalSize int64
- var cachedTopologyInfo *master_pb.TopologyInfo
- // Get detailed volume information via gRPC
- err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- if err != nil {
- return err
- }
- // Cache the topology info for reuse
- cachedTopologyInfo = resp.TopologyInfo
- if resp.TopologyInfo != nil {
- for _, dc := range resp.TopologyInfo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, node := range rack.DataNodeInfos {
- for _, diskInfo := range node.DiskInfos {
- // Process regular volumes
- for _, volInfo := range diskInfo.VolumeInfos {
- volume := VolumeWithTopology{
- VolumeInformationMessage: volInfo,
- Server: node.Id,
- DataCenter: dc.Id,
- Rack: rack.Id,
- }
- volumes = append(volumes, volume)
- totalSize += int64(volInfo.Size)
- }
- // Process EC shards in the same loop
- for _, ecShardInfo := range diskInfo.EcShardInfos {
- // Add all shard sizes for this EC volume
- for _, shardSize := range ecShardInfo.ShardSizes {
- totalSize += shardSize
- }
- }
- }
- }
- }
- }
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- // Filter by collection if specified
- if collection != "" {
- var filteredVolumes []VolumeWithTopology
- var filteredTotalSize int64
- var filteredEcTotalSize int64
- for _, volume := range volumes {
- if matchesCollection(volume.Collection, collection) {
- filteredVolumes = append(filteredVolumes, volume)
- filteredTotalSize += int64(volume.Size)
- }
- }
- // Filter EC shard sizes by collection using already processed data
- // This reuses the topology traversal done above (lines 43-71) to avoid a second pass
- if cachedTopologyInfo != nil {
- for _, dc := range cachedTopologyInfo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, node := range rack.DataNodeInfos {
- for _, diskInfo := range node.DiskInfos {
- for _, ecShardInfo := range diskInfo.EcShardInfos {
- if matchesCollection(ecShardInfo.Collection, collection) {
- // Add all shard sizes for this EC volume
- for _, shardSize := range ecShardInfo.ShardSizes {
- filteredEcTotalSize += shardSize
- }
- }
- }
- }
- }
- }
- }
- }
- volumes = filteredVolumes
- totalSize = filteredTotalSize + filteredEcTotalSize
- }
- // Calculate unique data center, rack, disk type, collection, and version counts from filtered volumes
- dataCenterMap := make(map[string]bool)
- rackMap := make(map[string]bool)
- diskTypeMap := make(map[string]bool)
- collectionMap := make(map[string]bool)
- versionMap := make(map[string]bool)
- for _, volume := range volumes {
- if volume.DataCenter != "" {
- dataCenterMap[volume.DataCenter] = true
- }
- if volume.Rack != "" {
- rackMap[volume.Rack] = true
- }
- diskType := volume.DiskType
- if diskType == "" {
- diskType = "hdd" // Default to hdd if not specified
- }
- diskTypeMap[diskType] = true
- // Handle collection for display purposes
- collectionName := volume.Collection
- if collectionName == "" {
- collectionName = "default"
- }
- collectionMap[collectionName] = true
- versionMap[fmt.Sprintf("%d", volume.Version)] = true
- }
- dataCenterCount := len(dataCenterMap)
- rackCount := len(rackMap)
- diskTypeCount := len(diskTypeMap)
- collectionCount := len(collectionMap)
- versionCount := len(versionMap)
- // Sort volumes
- s.sortVolumes(volumes, sortBy, sortOrder)
- // Get volume size limit from master
- var volumeSizeLimit uint64
- err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return err
- }
- volumeSizeLimit = uint64(resp.VolumeSizeLimitMB) * 1024 * 1024 // Convert MB to bytes
- return nil
- })
- if err != nil {
- // If we can't get the limit, set a default
- volumeSizeLimit = 30 * 1024 * 1024 * 1024 // 30GB default
- }
- // Calculate pagination
- totalVolumes := len(volumes)
- totalPages := (totalVolumes + pageSize - 1) / pageSize
- if totalPages == 0 {
- totalPages = 1
- }
- // Apply pagination
- startIndex := (page - 1) * pageSize
- endIndex := startIndex + pageSize
- if startIndex >= totalVolumes {
- volumes = []VolumeWithTopology{}
- } else {
- if endIndex > totalVolumes {
- endIndex = totalVolumes
- }
- volumes = volumes[startIndex:endIndex]
- }
- // Determine conditional display flags and extract single values
- showDataCenterColumn := dataCenterCount > 1
- showRackColumn := rackCount > 1
- showDiskTypeColumn := diskTypeCount > 1
- showCollectionColumn := collectionCount > 1 && collection == "" // Hide column when filtering by collection
- showVersionColumn := versionCount > 1
- var singleDataCenter, singleRack, singleDiskType, singleCollection, singleVersion string
- var allVersions, allDiskTypes []string
- if dataCenterCount == 1 {
- for dc := range dataCenterMap {
- singleDataCenter = dc
- break
- }
- }
- if rackCount == 1 {
- for rack := range rackMap {
- singleRack = rack
- break
- }
- }
- if diskTypeCount == 1 {
- for diskType := range diskTypeMap {
- singleDiskType = diskType
- break
- }
- } else {
- // Collect all disk types and sort them
- for diskType := range diskTypeMap {
- allDiskTypes = append(allDiskTypes, diskType)
- }
- sort.Strings(allDiskTypes)
- }
- if collectionCount == 1 {
- for collection := range collectionMap {
- singleCollection = collection
- break
- }
- }
- if versionCount == 1 {
- for version := range versionMap {
- singleVersion = "v" + version
- break
- }
- } else {
- // Collect all versions and sort them
- for version := range versionMap {
- allVersions = append(allVersions, "v"+version)
- }
- sort.Strings(allVersions)
- }
- return &ClusterVolumesData{
- Volumes: volumes,
- TotalVolumes: totalVolumes,
- TotalSize: totalSize,
- VolumeSizeLimit: volumeSizeLimit,
- LastUpdated: time.Now(),
- CurrentPage: page,
- TotalPages: totalPages,
- PageSize: pageSize,
- SortBy: sortBy,
- SortOrder: sortOrder,
- DataCenterCount: dataCenterCount,
- RackCount: rackCount,
- DiskTypeCount: diskTypeCount,
- CollectionCount: collectionCount,
- VersionCount: versionCount,
- ShowDataCenterColumn: showDataCenterColumn,
- ShowRackColumn: showRackColumn,
- ShowDiskTypeColumn: showDiskTypeColumn,
- ShowCollectionColumn: showCollectionColumn,
- ShowVersionColumn: showVersionColumn,
- SingleDataCenter: singleDataCenter,
- SingleRack: singleRack,
- SingleDiskType: singleDiskType,
- SingleCollection: singleCollection,
- SingleVersion: singleVersion,
- AllVersions: allVersions,
- AllDiskTypes: allDiskTypes,
- FilterCollection: collection,
- }, nil
- }
- // sortVolumes sorts the volumes slice based on the specified field and order
- func (s *AdminServer) sortVolumes(volumes []VolumeWithTopology, sortBy string, sortOrder string) {
- sort.Slice(volumes, func(i, j int) bool {
- var less bool
- switch sortBy {
- case "id":
- less = volumes[i].Id < volumes[j].Id
- case "server":
- less = volumes[i].Server < volumes[j].Server
- case "datacenter":
- less = volumes[i].DataCenter < volumes[j].DataCenter
- case "rack":
- less = volumes[i].Rack < volumes[j].Rack
- case "collection":
- less = volumes[i].Collection < volumes[j].Collection
- case "size":
- less = volumes[i].Size < volumes[j].Size
- case "filecount":
- less = volumes[i].FileCount < volumes[j].FileCount
- case "replication":
- less = volumes[i].ReplicaPlacement < volumes[j].ReplicaPlacement
- case "disktype":
- less = volumes[i].DiskType < volumes[j].DiskType
- case "version":
- less = volumes[i].Version < volumes[j].Version
- default:
- less = volumes[i].Id < volumes[j].Id
- }
- if sortOrder == "desc" {
- return !less
- }
- return less
- })
- }
- // GetVolumeDetails retrieves detailed information about a specific volume
- func (s *AdminServer) GetVolumeDetails(volumeID int, server string) (*VolumeDetailsData, error) {
- var primaryVolume VolumeWithTopology
- var replicas []VolumeWithTopology
- var volumeSizeLimit uint64
- var found bool
- // Find the volume and all its replicas in the cluster
- err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- if err != nil {
- return err
- }
- if resp.TopologyInfo != nil {
- for _, dc := range resp.TopologyInfo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, node := range rack.DataNodeInfos {
- for _, diskInfo := range node.DiskInfos {
- for _, volInfo := range diskInfo.VolumeInfos {
- if int(volInfo.Id) == volumeID {
- diskType := volInfo.DiskType
- if diskType == "" {
- diskType = "hdd"
- }
- volume := VolumeWithTopology{
- VolumeInformationMessage: volInfo,
- Server: node.Id,
- DataCenter: dc.Id,
- Rack: rack.Id,
- }
- // If this is the requested server, it's the primary volume
- if node.Id == server {
- primaryVolume = volume
- found = true
- } else {
- // This is a replica on another server
- replicas = append(replicas, volume)
- }
- }
- }
- }
- }
- }
- }
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- if !found {
- return nil, fmt.Errorf("volume %d not found on server %s", volumeID, server)
- }
- // Get volume size limit from master
- err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return err
- }
- volumeSizeLimit = uint64(resp.VolumeSizeLimitMB) * 1024 * 1024 // Convert MB to bytes
- return nil
- })
- if err != nil {
- // If we can't get the limit, set a default
- volumeSizeLimit = 30 * 1024 * 1024 * 1024 // 30GB default
- }
- return &VolumeDetailsData{
- Volume: primaryVolume,
- Replicas: replicas,
- VolumeSizeLimit: volumeSizeLimit,
- ReplicationCount: len(replicas) + 1, // Include the primary volume
- LastUpdated: time.Now(),
- }, nil
- }
- // VacuumVolume performs a vacuum operation on a specific volume
- func (s *AdminServer) VacuumVolume(volumeID int, server string) error {
- return s.WithMasterClient(func(client master_pb.SeaweedClient) error {
- _, err := client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
- VolumeId: uint32(volumeID),
- GarbageThreshold: 0.0001, // A very low threshold to ensure all garbage is collected
- Collection: "", // Empty for all collections
- })
- return err
- })
- }
- // GetClusterVolumeServers retrieves cluster volume servers data including EC shard information
- func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, error) {
- var volumeServerMap map[string]*VolumeServer
- // Make only ONE VolumeList call and use it for both topology building AND EC shard processing
- err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- if err != nil {
- return err
- }
- // Get volume size limit from response, default to 30GB if not set
- volumeSizeLimitMB := resp.VolumeSizeLimitMb
- if volumeSizeLimitMB == 0 {
- volumeSizeLimitMB = 30000 // default to 30000MB (30GB)
- }
- // Build basic topology from the VolumeList response (replaces GetClusterTopology call)
- volumeServerMap = make(map[string]*VolumeServer)
- if resp.TopologyInfo != nil {
- // Process topology to build basic volume server info (similar to cluster_topology.go logic)
- for _, dc := range resp.TopologyInfo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, node := range rack.DataNodeInfos {
- // Initialize volume server if not exists
- if volumeServerMap[node.Id] == nil {
- volumeServerMap[node.Id] = &VolumeServer{
- Address: node.Id,
- DataCenter: dc.Id,
- Rack: rack.Id,
- Volumes: 0,
- DiskUsage: 0,
- DiskCapacity: 0,
- EcVolumes: 0,
- EcShards: 0,
- EcShardDetails: []VolumeServerEcInfo{},
- }
- }
- vs := volumeServerMap[node.Id]
- // Process EC shard information for this server at volume server level (not per-disk)
- ecVolumeMap := make(map[uint32]*VolumeServerEcInfo)
- // Temporary map to accumulate shard info across disks
- ecShardAccumulator := make(map[uint32][]*master_pb.VolumeEcShardInformationMessage)
- // Process disk information
- for _, diskInfo := range node.DiskInfos {
- vs.DiskCapacity += int64(diskInfo.MaxVolumeCount) * int64(volumeSizeLimitMB) * 1024 * 1024 // Use actual volume size limit
- // Count regular volumes and calculate disk usage
- for _, volInfo := range diskInfo.VolumeInfos {
- vs.Volumes++
- vs.DiskUsage += int64(volInfo.Size)
- }
- // Accumulate EC shard information across all disks for this volume server
- for _, ecShardInfo := range diskInfo.EcShardInfos {
- volumeId := ecShardInfo.Id
- ecShardAccumulator[volumeId] = append(ecShardAccumulator[volumeId], ecShardInfo)
- }
- }
- // Process accumulated EC shard information per volume
- for volumeId, ecShardInfos := range ecShardAccumulator {
- if len(ecShardInfos) == 0 {
- continue
- }
- // Initialize EC volume info
- ecInfo := &VolumeServerEcInfo{
- VolumeID: volumeId,
- Collection: ecShardInfos[0].Collection,
- ShardCount: 0,
- EcIndexBits: 0,
- ShardNumbers: []int{},
- ShardSizes: make(map[int]int64),
- TotalSize: 0,
- }
- // Merge EcIndexBits from all disks and collect shard sizes
- allShardSizes := make(map[erasure_coding.ShardId]int64)
- for _, ecShardInfo := range ecShardInfos {
- ecInfo.EcIndexBits |= ecShardInfo.EcIndexBits
- // Collect shard sizes from this disk
- shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
- shardBits.EachSetIndex(func(shardId erasure_coding.ShardId) {
- if size, found := erasure_coding.GetShardSize(ecShardInfo, shardId); found {
- allShardSizes[shardId] = size
- }
- })
- }
- // Process final merged shard information
- finalShardBits := erasure_coding.ShardBits(ecInfo.EcIndexBits)
- finalShardBits.EachSetIndex(func(shardId erasure_coding.ShardId) {
- ecInfo.ShardCount++
- ecInfo.ShardNumbers = append(ecInfo.ShardNumbers, int(shardId))
- vs.EcShards++
- // Add shard size if available
- if shardSize, exists := allShardSizes[shardId]; exists {
- ecInfo.ShardSizes[int(shardId)] = shardSize
- ecInfo.TotalSize += shardSize
- vs.DiskUsage += shardSize // Add EC shard size to total disk usage
- }
- })
- ecVolumeMap[volumeId] = ecInfo
- }
- // Convert EC volume map to slice and update volume server (after processing all disks)
- for _, ecInfo := range ecVolumeMap {
- vs.EcShardDetails = append(vs.EcShardDetails, *ecInfo)
- vs.EcVolumes++
- }
- }
- }
- }
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- // Convert map back to slice
- var volumeServers []VolumeServer
- for _, vs := range volumeServerMap {
- volumeServers = append(volumeServers, *vs)
- }
- var totalCapacity int64
- var totalVolumes int
- for _, vs := range volumeServers {
- totalCapacity += vs.DiskCapacity
- totalVolumes += vs.Volumes
- }
- return &ClusterVolumeServersData{
- VolumeServers: volumeServers,
- TotalVolumeServers: len(volumeServers),
- TotalVolumes: totalVolumes,
- TotalCapacity: totalCapacity,
- LastUpdated: time.Now(),
- }, nil
- }
|