| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- package topology
- import (
- "fmt"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- )
- // UpdateTopology updates the topology information from master
- func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
- at.mutex.Lock()
- defer at.mutex.Unlock()
- at.topologyInfo = topologyInfo
- at.lastUpdated = time.Now()
- // Rebuild structured topology
- at.nodes = make(map[string]*activeNode)
- at.disks = make(map[string]*activeDisk)
- for _, dc := range topologyInfo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, nodeInfo := range rack.DataNodeInfos {
- node := &activeNode{
- nodeID: nodeInfo.Id,
- dataCenter: dc.Id,
- rack: rack.Id,
- nodeInfo: nodeInfo,
- disks: make(map[uint32]*activeDisk),
- }
- // Add disks for this node
- for diskType, diskInfo := range nodeInfo.DiskInfos {
- disk := &activeDisk{
- DiskInfo: &DiskInfo{
- NodeID: nodeInfo.Id,
- DiskID: diskInfo.DiskId,
- DiskType: diskType,
- DataCenter: dc.Id,
- Rack: rack.Id,
- DiskInfo: diskInfo,
- },
- }
- diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
- node.disks[diskInfo.DiskId] = disk
- at.disks[diskKey] = disk
- }
- at.nodes[nodeInfo.Id] = node
- }
- }
- }
- // Rebuild performance indexes for O(1) lookups
- at.rebuildIndexes()
- // Reassign task states to updated topology
- at.reassignTaskStates()
- glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks, %d volume entries, %d EC shard entries",
- len(at.nodes), len(at.disks), len(at.volumeIndex), len(at.ecShardIndex))
- return nil
- }
- // GetAvailableDisks returns disks that can accept new tasks of the given type
- // NOTE: For capacity-aware operations, prefer GetDisksWithEffectiveCapacity
- func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- var available []*DiskInfo
- for _, disk := range at.disks {
- if disk.NodeID == excludeNodeID {
- continue // Skip excluded node
- }
- if at.isDiskAvailable(disk, taskType) {
- // Create a copy with current load count and effective capacity
- diskCopy := *disk.DiskInfo
- diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
- available = append(available, &diskCopy)
- }
- }
- return available
- }
- // HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
- func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- for _, task := range at.recentTasks {
- if task.VolumeID == volumeID && task.TaskType == taskType {
- return true
- }
- }
- return false
- }
- // GetAllNodes returns information about all nodes (public interface)
- func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- result := make(map[string]*master_pb.DataNodeInfo)
- for nodeID, node := range at.nodes {
- result[nodeID] = node.nodeInfo
- }
- return result
- }
- // GetTopologyInfo returns the current topology information (read-only access)
- func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- return at.topologyInfo
- }
- // GetNodeDisks returns all disks for a specific node
- func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- node, exists := at.nodes[nodeID]
- if !exists {
- return nil
- }
- var disks []*DiskInfo
- for _, disk := range node.disks {
- diskCopy := *disk.DiskInfo
- diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
- disks = append(disks, &diskCopy)
- }
- return disks
- }
- // rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups
- func (at *ActiveTopology) rebuildIndexes() {
- // Clear existing indexes
- at.volumeIndex = make(map[uint32][]string)
- at.ecShardIndex = make(map[uint32][]string)
- // Rebuild indexes from current topology
- for _, dc := range at.topologyInfo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, nodeInfo := range rack.DataNodeInfos {
- for _, diskInfo := range nodeInfo.DiskInfos {
- diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
- // Index volumes
- for _, volumeInfo := range diskInfo.VolumeInfos {
- volumeID := volumeInfo.Id
- at.volumeIndex[volumeID] = append(at.volumeIndex[volumeID], diskKey)
- }
- // Index EC shards
- for _, ecShardInfo := range diskInfo.EcShardInfos {
- volumeID := ecShardInfo.Id
- at.ecShardIndex[volumeID] = append(at.ecShardIndex[volumeID], diskKey)
- }
- }
- }
- }
- }
- }
- // GetVolumeLocations returns the disk locations for a volume using O(1) lookup
- func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) []VolumeReplica {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- diskKeys, exists := at.volumeIndex[volumeID]
- if !exists {
- return []VolumeReplica{}
- }
- var replicas []VolumeReplica
- for _, diskKey := range diskKeys {
- if disk, diskExists := at.disks[diskKey]; diskExists {
- // Verify collection matches (since index doesn't include collection)
- if at.volumeMatchesCollection(disk, volumeID, collection) {
- replicas = append(replicas, VolumeReplica{
- ServerID: disk.NodeID,
- DiskID: disk.DiskID,
- DataCenter: disk.DataCenter,
- Rack: disk.Rack,
- })
- }
- }
- }
- return replicas
- }
- // GetECShardLocations returns the disk locations for EC shards using O(1) lookup
- func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string) []VolumeReplica {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- diskKeys, exists := at.ecShardIndex[volumeID]
- if !exists {
- return []VolumeReplica{}
- }
- var ecShards []VolumeReplica
- for _, diskKey := range diskKeys {
- if disk, diskExists := at.disks[diskKey]; diskExists {
- // Verify collection matches (since index doesn't include collection)
- if at.ecShardMatchesCollection(disk, volumeID, collection) {
- ecShards = append(ecShards, VolumeReplica{
- ServerID: disk.NodeID,
- DiskID: disk.DiskID,
- DataCenter: disk.DataCenter,
- Rack: disk.Rack,
- })
- }
- }
- }
- return ecShards
- }
- // volumeMatchesCollection checks if a volume on a disk matches the given collection
- func (at *ActiveTopology) volumeMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
- if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
- return false
- }
- for _, volumeInfo := range disk.DiskInfo.DiskInfo.VolumeInfos {
- if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
- return true
- }
- }
- return false
- }
- // ecShardMatchesCollection checks if EC shards on a disk match the given collection
- func (at *ActiveTopology) ecShardMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
- if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
- return false
- }
- for _, ecShardInfo := range disk.DiskInfo.DiskInfo.EcShardInfos {
- if ecShardInfo.Id == volumeID && ecShardInfo.Collection == collection {
- return true
- }
- }
- return false
- }
|