| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- package topology
- import (
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- )
- // GetEffectiveAvailableCapacity returns the effective available capacity for a disk
- // This considers BOTH pending and assigned tasks for capacity reservation.
- //
- // Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks
- //
- // The calculation includes:
- // - Pending tasks: Reserve capacity immediately when added
- // - Assigned tasks: Continue to reserve capacity during execution
- // - Recently completed tasks are NOT counted against capacity
- func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
- disk, exists := at.disks[diskKey]
- if !exists {
- return 0
- }
- if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
- return 0
- }
- // Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking
- capacity := at.getEffectiveAvailableCapacityUnsafe(disk)
- return int64(capacity.VolumeSlots)
- }
- // GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange
- // This provides granular information about available volume slots and shard slots
- func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
- disk, exists := at.disks[diskKey]
- if !exists {
- return StorageSlotChange{}
- }
- if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
- return StorageSlotChange{}
- }
- return at.getEffectiveAvailableCapacityUnsafe(disk)
- }
- // GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
- // This shows the net impact from all pending and assigned tasks
- func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
- disk, exists := at.disks[diskKey]
- if !exists {
- return StorageSlotChange{}
- }
- return at.getEffectiveCapacityUnsafe(disk)
- }
- // GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity
- // This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange.
- //
- // Parameters:
- // - taskType: type of task to check compatibility for
- // - excludeNodeID: node to exclude from results
- // - minCapacity: minimum effective capacity required (in volume slots)
- //
- // Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks
- func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- var available []*DiskInfo
- for _, disk := range at.disks {
- if disk.NodeID == excludeNodeID {
- continue // Skip excluded node
- }
- if at.isDiskAvailable(disk, taskType) {
- effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
- // Only include disks that meet minimum capacity requirement
- if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
- // Create a new DiskInfo with current capacity information
- diskCopy := DiskInfo{
- NodeID: disk.DiskInfo.NodeID,
- DiskID: disk.DiskInfo.DiskID,
- DiskType: disk.DiskInfo.DiskType,
- DataCenter: disk.DiskInfo.DataCenter,
- Rack: disk.DiskInfo.Rack,
- LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
- }
- // Create a new protobuf DiskInfo to avoid modifying the original
- diskInfoCopy := &master_pb.DiskInfo{
- DiskId: disk.DiskInfo.DiskInfo.DiskId,
- MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
- VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
- VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
- EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
- RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
- ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
- FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
- }
- diskCopy.DiskInfo = diskInfoCopy
- available = append(available, &diskCopy)
- }
- }
- }
- return available
- }
- // GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions
- // This helps avoid over-scheduling tasks to the same disk
- func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- var available []*DiskInfo
- for _, disk := range at.disks {
- if disk.NodeID == excludeNodeID {
- continue // Skip excluded node
- }
- // Consider both pending and active tasks for scheduling decisions
- if at.isDiskAvailableForPlanning(disk, taskType) {
- // Check if disk can accommodate new task considering pending tasks
- planningCapacity := at.getPlanningCapacityUnsafe(disk)
- if int64(planningCapacity.VolumeSlots) >= minCapacity {
- // Create a new DiskInfo with planning information
- diskCopy := DiskInfo{
- NodeID: disk.DiskInfo.NodeID,
- DiskID: disk.DiskInfo.DiskID,
- DiskType: disk.DiskInfo.DiskType,
- DataCenter: disk.DiskInfo.DataCenter,
- Rack: disk.DiskInfo.Rack,
- LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks),
- }
- // Create a new protobuf DiskInfo to avoid modifying the original
- diskInfoCopy := &master_pb.DiskInfo{
- DiskId: disk.DiskInfo.DiskInfo.DiskId,
- MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
- VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots),
- VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
- EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
- RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
- ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
- FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
- }
- diskCopy.DiskInfo = diskInfoCopy
- available = append(available, &diskCopy)
- }
- }
- }
- return available
- }
- // CanAccommodateTask checks if a disk can accommodate a new task considering all constraints
- func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
- disk, exists := at.disks[diskKey]
- if !exists {
- return false
- }
- // Check basic availability
- if !at.isDiskAvailable(disk, taskType) {
- return false
- }
- // Check effective capacity
- effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
- return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded
- }
- // getPlanningCapacityUnsafe considers both pending and active tasks for planning
- func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange {
- if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
- return StorageSlotChange{}
- }
- baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
- // Use the centralized helper function to calculate task storage impact
- totalImpact := at.calculateTaskStorageImpact(disk)
- // Calculate available capacity considering impact (negative impact reduces availability)
- availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots()
- if availableVolumeSlots < 0 {
- availableVolumeSlots = 0
- }
- // Return detailed capacity information
- return StorageSlotChange{
- VolumeSlots: int32(availableVolumeSlots),
- ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
- }
- }
- // isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load
- func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool {
- // Check total load including pending tasks
- totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
- if totalLoad >= MaxTotalTaskLoadPerDisk {
- return false
- }
- // Check for conflicting task types in active tasks only
- for _, task := range disk.assignedTasks {
- if at.areTaskTypesConflicting(task.TaskType, taskType) {
- return false
- }
- }
- return true
- }
- // calculateTaskStorageImpact is a helper function that calculates the total storage impact
- // from all tasks (pending and assigned) on a given disk. This eliminates code duplication
- // between multiple capacity calculation functions.
- func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange {
- if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
- return StorageSlotChange{}
- }
- totalImpact := StorageSlotChange{}
- // Process both pending and assigned tasks with identical logic
- taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks}
- for _, taskList := range taskLists {
- for _, task := range taskList {
- // Calculate impact for all source locations
- for _, source := range task.Sources {
- if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID {
- totalImpact.AddInPlace(source.StorageChange)
- }
- }
- // Calculate impact for all destination locations
- for _, dest := range task.Destinations {
- if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID {
- totalImpact.AddInPlace(dest.StorageChange)
- }
- }
- }
- }
- return totalImpact
- }
- // getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use)
- // Returns StorageSlotChange representing the net impact from all tasks
- func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange {
- return at.calculateTaskStorageImpact(disk)
- }
- // getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange
- func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange {
- if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
- return StorageSlotChange{}
- }
- baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
- netImpact := at.getEffectiveCapacityUnsafe(disk)
- // Calculate available volume slots (negative impact reduces availability)
- availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots()
- if availableVolumeSlots < 0 {
- availableVolumeSlots = 0
- }
- // Return detailed capacity information
- return StorageSlotChange{
- VolumeSlots: int32(availableVolumeSlots),
- ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
- }
- }
|