cluster_topology.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package dash
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  8. )
  9. // GetClusterTopology returns the current cluster topology with caching
  10. func (s *AdminServer) GetClusterTopology() (*ClusterTopology, error) {
  11. now := time.Now()
  12. if s.cachedTopology != nil && now.Sub(s.lastCacheUpdate) < s.cacheExpiration {
  13. return s.cachedTopology, nil
  14. }
  15. topology := &ClusterTopology{
  16. UpdatedAt: now,
  17. }
  18. // Use gRPC only
  19. err := s.getTopologyViaGRPC(topology)
  20. if err != nil {
  21. currentMaster := s.masterClient.GetMaster(context.Background())
  22. glog.Errorf("Failed to connect to master server %s: %v", currentMaster, err)
  23. return nil, fmt.Errorf("gRPC topology request failed: %w", err)
  24. }
  25. // Cache the result
  26. s.cachedTopology = topology
  27. s.lastCacheUpdate = now
  28. return topology, nil
  29. }
  30. // getTopologyViaGRPC gets topology using gRPC (original method)
  31. func (s *AdminServer) getTopologyViaGRPC(topology *ClusterTopology) error {
  32. // Get cluster status from master
  33. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  34. resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  35. if err != nil {
  36. currentMaster := s.masterClient.GetMaster(context.Background())
  37. glog.Errorf("Failed to get volume list from master %s: %v", currentMaster, err)
  38. return err
  39. }
  40. if resp.TopologyInfo != nil {
  41. // Process gRPC response
  42. for _, dc := range resp.TopologyInfo.DataCenterInfos {
  43. dataCenter := DataCenter{
  44. ID: dc.Id,
  45. Racks: []Rack{},
  46. }
  47. for _, rack := range dc.RackInfos {
  48. rackObj := Rack{
  49. ID: rack.Id,
  50. Nodes: []VolumeServer{},
  51. }
  52. for _, node := range rack.DataNodeInfos {
  53. // Calculate totals from disk infos
  54. var totalVolumes int64
  55. var totalMaxVolumes int64
  56. var totalSize int64
  57. var totalFiles int64
  58. for _, diskInfo := range node.DiskInfos {
  59. totalVolumes += diskInfo.VolumeCount
  60. totalMaxVolumes += diskInfo.MaxVolumeCount
  61. // Sum up individual volume information
  62. for _, volInfo := range diskInfo.VolumeInfos {
  63. totalSize += int64(volInfo.Size)
  64. totalFiles += int64(volInfo.FileCount)
  65. }
  66. // Sum up EC shard sizes
  67. for _, ecShardInfo := range diskInfo.EcShardInfos {
  68. for _, shardSize := range ecShardInfo.ShardSizes {
  69. totalSize += shardSize
  70. }
  71. }
  72. }
  73. vs := VolumeServer{
  74. ID: node.Id,
  75. Address: node.Id,
  76. DataCenter: dc.Id,
  77. Rack: rack.Id,
  78. PublicURL: node.Id,
  79. Volumes: int(totalVolumes),
  80. MaxVolumes: int(totalMaxVolumes),
  81. DiskUsage: totalSize,
  82. DiskCapacity: totalMaxVolumes * int64(resp.VolumeSizeLimitMb) * 1024 * 1024,
  83. LastHeartbeat: time.Now(),
  84. }
  85. rackObj.Nodes = append(rackObj.Nodes, vs)
  86. topology.VolumeServers = append(topology.VolumeServers, vs)
  87. topology.TotalVolumes += vs.Volumes
  88. topology.TotalFiles += totalFiles
  89. topology.TotalSize += totalSize
  90. }
  91. dataCenter.Racks = append(dataCenter.Racks, rackObj)
  92. }
  93. topology.DataCenters = append(topology.DataCenters, dataCenter)
  94. }
  95. }
  96. return nil
  97. })
  98. return err
  99. }
  100. // InvalidateCache forces a refresh of cached data
  101. func (s *AdminServer) InvalidateCache() {
  102. s.lastCacheUpdate = time.Time{}
  103. s.cachedTopology = nil
  104. s.lastFilerUpdate = time.Time{}
  105. s.cachedFilers = nil
  106. }