topology_management.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package topology
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. )
  8. // UpdateTopology updates the topology information from master
  9. func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
  10. at.mutex.Lock()
  11. defer at.mutex.Unlock()
  12. at.topologyInfo = topologyInfo
  13. at.lastUpdated = time.Now()
  14. // Rebuild structured topology
  15. at.nodes = make(map[string]*activeNode)
  16. at.disks = make(map[string]*activeDisk)
  17. for _, dc := range topologyInfo.DataCenterInfos {
  18. for _, rack := range dc.RackInfos {
  19. for _, nodeInfo := range rack.DataNodeInfos {
  20. node := &activeNode{
  21. nodeID: nodeInfo.Id,
  22. dataCenter: dc.Id,
  23. rack: rack.Id,
  24. nodeInfo: nodeInfo,
  25. disks: make(map[uint32]*activeDisk),
  26. }
  27. // Add disks for this node
  28. for diskType, diskInfo := range nodeInfo.DiskInfos {
  29. disk := &activeDisk{
  30. DiskInfo: &DiskInfo{
  31. NodeID: nodeInfo.Id,
  32. DiskID: diskInfo.DiskId,
  33. DiskType: diskType,
  34. DataCenter: dc.Id,
  35. Rack: rack.Id,
  36. DiskInfo: diskInfo,
  37. },
  38. }
  39. diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
  40. node.disks[diskInfo.DiskId] = disk
  41. at.disks[diskKey] = disk
  42. }
  43. at.nodes[nodeInfo.Id] = node
  44. }
  45. }
  46. }
  47. // Rebuild performance indexes for O(1) lookups
  48. at.rebuildIndexes()
  49. // Reassign task states to updated topology
  50. at.reassignTaskStates()
  51. glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks, %d volume entries, %d EC shard entries",
  52. len(at.nodes), len(at.disks), len(at.volumeIndex), len(at.ecShardIndex))
  53. return nil
  54. }
  55. // GetAvailableDisks returns disks that can accept new tasks of the given type
  56. // NOTE: For capacity-aware operations, prefer GetDisksWithEffectiveCapacity
  57. func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
  58. at.mutex.RLock()
  59. defer at.mutex.RUnlock()
  60. var available []*DiskInfo
  61. for _, disk := range at.disks {
  62. if disk.NodeID == excludeNodeID {
  63. continue // Skip excluded node
  64. }
  65. if at.isDiskAvailable(disk, taskType) {
  66. // Create a copy with current load count and effective capacity
  67. diskCopy := *disk.DiskInfo
  68. diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
  69. available = append(available, &diskCopy)
  70. }
  71. }
  72. return available
  73. }
  74. // HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
  75. func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
  76. at.mutex.RLock()
  77. defer at.mutex.RUnlock()
  78. for _, task := range at.recentTasks {
  79. if task.VolumeID == volumeID && task.TaskType == taskType {
  80. return true
  81. }
  82. }
  83. return false
  84. }
  85. // GetAllNodes returns information about all nodes (public interface)
  86. func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
  87. at.mutex.RLock()
  88. defer at.mutex.RUnlock()
  89. result := make(map[string]*master_pb.DataNodeInfo)
  90. for nodeID, node := range at.nodes {
  91. result[nodeID] = node.nodeInfo
  92. }
  93. return result
  94. }
  95. // GetTopologyInfo returns the current topology information (read-only access)
  96. func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
  97. at.mutex.RLock()
  98. defer at.mutex.RUnlock()
  99. return at.topologyInfo
  100. }
  101. // GetNodeDisks returns all disks for a specific node
  102. func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
  103. at.mutex.RLock()
  104. defer at.mutex.RUnlock()
  105. node, exists := at.nodes[nodeID]
  106. if !exists {
  107. return nil
  108. }
  109. var disks []*DiskInfo
  110. for _, disk := range node.disks {
  111. diskCopy := *disk.DiskInfo
  112. diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
  113. disks = append(disks, &diskCopy)
  114. }
  115. return disks
  116. }
  117. // rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups
  118. func (at *ActiveTopology) rebuildIndexes() {
  119. // Clear existing indexes
  120. at.volumeIndex = make(map[uint32][]string)
  121. at.ecShardIndex = make(map[uint32][]string)
  122. // Rebuild indexes from current topology
  123. for _, dc := range at.topologyInfo.DataCenterInfos {
  124. for _, rack := range dc.RackInfos {
  125. for _, nodeInfo := range rack.DataNodeInfos {
  126. for _, diskInfo := range nodeInfo.DiskInfos {
  127. diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
  128. // Index volumes
  129. for _, volumeInfo := range diskInfo.VolumeInfos {
  130. volumeID := volumeInfo.Id
  131. at.volumeIndex[volumeID] = append(at.volumeIndex[volumeID], diskKey)
  132. }
  133. // Index EC shards
  134. for _, ecShardInfo := range diskInfo.EcShardInfos {
  135. volumeID := ecShardInfo.Id
  136. at.ecShardIndex[volumeID] = append(at.ecShardIndex[volumeID], diskKey)
  137. }
  138. }
  139. }
  140. }
  141. }
  142. }
  143. // GetVolumeLocations returns the disk locations for a volume using O(1) lookup
  144. func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) []VolumeReplica {
  145. at.mutex.RLock()
  146. defer at.mutex.RUnlock()
  147. diskKeys, exists := at.volumeIndex[volumeID]
  148. if !exists {
  149. return []VolumeReplica{}
  150. }
  151. var replicas []VolumeReplica
  152. for _, diskKey := range diskKeys {
  153. if disk, diskExists := at.disks[diskKey]; diskExists {
  154. // Verify collection matches (since index doesn't include collection)
  155. if at.volumeMatchesCollection(disk, volumeID, collection) {
  156. replicas = append(replicas, VolumeReplica{
  157. ServerID: disk.NodeID,
  158. DiskID: disk.DiskID,
  159. DataCenter: disk.DataCenter,
  160. Rack: disk.Rack,
  161. })
  162. }
  163. }
  164. }
  165. return replicas
  166. }
  167. // GetECShardLocations returns the disk locations for EC shards using O(1) lookup
  168. func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string) []VolumeReplica {
  169. at.mutex.RLock()
  170. defer at.mutex.RUnlock()
  171. diskKeys, exists := at.ecShardIndex[volumeID]
  172. if !exists {
  173. return []VolumeReplica{}
  174. }
  175. var ecShards []VolumeReplica
  176. for _, diskKey := range diskKeys {
  177. if disk, diskExists := at.disks[diskKey]; diskExists {
  178. // Verify collection matches (since index doesn't include collection)
  179. if at.ecShardMatchesCollection(disk, volumeID, collection) {
  180. ecShards = append(ecShards, VolumeReplica{
  181. ServerID: disk.NodeID,
  182. DiskID: disk.DiskID,
  183. DataCenter: disk.DataCenter,
  184. Rack: disk.Rack,
  185. })
  186. }
  187. }
  188. }
  189. return ecShards
  190. }
  191. // volumeMatchesCollection checks if a volume on a disk matches the given collection
  192. func (at *ActiveTopology) volumeMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
  193. if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
  194. return false
  195. }
  196. for _, volumeInfo := range disk.DiskInfo.DiskInfo.VolumeInfos {
  197. if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
  198. return true
  199. }
  200. }
  201. return false
  202. }
  203. // ecShardMatchesCollection checks if EC shards on a disk match the given collection
  204. func (at *ActiveTopology) ecShardMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
  205. if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
  206. return false
  207. }
  208. for _, ecShardInfo := range disk.DiskInfo.DiskInfo.EcShardInfos {
  209. if ecShardInfo.Id == volumeID && ecShardInfo.Collection == collection {
  210. return true
  211. }
  212. }
  213. return false
  214. }