| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- package topology
- import (
- "fmt"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- )
- // AssignTask moves a task from pending to assigned and reserves capacity
- func (at *ActiveTopology) AssignTask(taskID string) error {
- at.mutex.Lock()
- defer at.mutex.Unlock()
- task, exists := at.pendingTasks[taskID]
- if !exists {
- return fmt.Errorf("pending task %s not found", taskID)
- }
- // Check if all destination disks have sufficient capacity to reserve
- for _, dest := range task.Destinations {
- targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk)
- if targetDisk, exists := at.disks[targetKey]; exists {
- availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk)
- // Check if we have enough total capacity using the improved unified comparison
- if !availableCapacity.CanAccommodate(dest.StorageChange) {
- return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v",
- dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange)
- }
- } else if dest.TargetServer != "" {
- // Fail fast if destination disk is not found in topology
- return fmt.Errorf("destination disk %s not found in topology", targetKey)
- }
- }
- // Move task to assigned and reserve capacity
- delete(at.pendingTasks, taskID)
- task.Status = TaskStatusInProgress
- at.assignedTasks[taskID] = task
- at.reassignTaskStates()
- // Log capacity reservation information for all sources and destinations
- totalSourceImpact := StorageSlotChange{}
- totalDestImpact := StorageSlotChange{}
- for _, source := range task.Sources {
- totalSourceImpact.AddInPlace(source.StorageChange)
- }
- for _, dest := range task.Destinations {
- totalDestImpact.AddInPlace(dest.StorageChange)
- }
- glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
- taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
- len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
- return nil
- }
- // CompleteTask moves a task from assigned to recent and releases reserved capacity
- // NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes)
- // should be handled by the master when it receives the task completion notification.
- func (at *ActiveTopology) CompleteTask(taskID string) error {
- at.mutex.Lock()
- defer at.mutex.Unlock()
- task, exists := at.assignedTasks[taskID]
- if !exists {
- return fmt.Errorf("assigned task %s not found", taskID)
- }
- // Release reserved capacity by moving task to completed state
- delete(at.assignedTasks, taskID)
- task.Status = TaskStatusCompleted
- task.CompletedAt = time.Now()
- at.recentTasks[taskID] = task
- at.reassignTaskStates()
- // Log capacity release information for all sources and destinations
- totalSourceImpact := StorageSlotChange{}
- totalDestImpact := StorageSlotChange{}
- for _, source := range task.Sources {
- totalSourceImpact.AddInPlace(source.StorageChange)
- }
- for _, dest := range task.Destinations {
- totalDestImpact.AddInPlace(dest.StorageChange)
- }
- glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
- taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
- len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
- // Clean up old recent tasks
- at.cleanupRecentTasks()
- return nil
- }
- // ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion
- // This should be called when the master updates the topology with new VolumeCount information
- func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) {
- at.mutex.Lock()
- defer at.mutex.Unlock()
- diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
- if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil {
- oldCount := disk.DiskInfo.DiskInfo.VolumeCount
- disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange
- glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)",
- diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange)
- }
- }
- // AddPendingTask is the unified function that handles both simple and complex task creation
- func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error {
- // Validation
- if len(spec.Sources) == 0 {
- return fmt.Errorf("at least one source is required")
- }
- if len(spec.Destinations) == 0 {
- return fmt.Errorf("at least one destination is required")
- }
- at.mutex.Lock()
- defer at.mutex.Unlock()
- // Build sources array
- sources := make([]TaskSource, len(spec.Sources))
- for i, sourceSpec := range spec.Sources {
- var storageImpact StorageSlotChange
- var estimatedSize int64
- if sourceSpec.StorageImpact != nil {
- // Use manually specified impact
- storageImpact = *sourceSpec.StorageImpact
- } else {
- // Auto-calculate based on task type and cleanup type
- storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize)
- }
- if sourceSpec.EstimatedSize != nil {
- estimatedSize = *sourceSpec.EstimatedSize
- } else {
- estimatedSize = spec.VolumeSize // Default to volume size
- }
- sources[i] = TaskSource{
- SourceServer: sourceSpec.ServerID,
- SourceDisk: sourceSpec.DiskID,
- StorageChange: storageImpact,
- EstimatedSize: estimatedSize,
- }
- }
- // Build destinations array
- destinations := make([]TaskDestination, len(spec.Destinations))
- for i, destSpec := range spec.Destinations {
- var storageImpact StorageSlotChange
- var estimatedSize int64
- if destSpec.StorageImpact != nil {
- // Use manually specified impact
- storageImpact = *destSpec.StorageImpact
- } else {
- // Auto-calculate based on task type
- _, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize)
- }
- if destSpec.EstimatedSize != nil {
- estimatedSize = *destSpec.EstimatedSize
- } else {
- estimatedSize = spec.VolumeSize // Default to volume size
- }
- destinations[i] = TaskDestination{
- TargetServer: destSpec.ServerID,
- TargetDisk: destSpec.DiskID,
- StorageChange: storageImpact,
- EstimatedSize: estimatedSize,
- }
- }
- // Create the task
- task := &taskState{
- VolumeID: spec.VolumeID,
- TaskType: spec.TaskType,
- Status: TaskStatusPending,
- StartedAt: time.Now(),
- EstimatedSize: spec.VolumeSize,
- Sources: sources,
- Destinations: destinations,
- }
- at.pendingTasks[spec.TaskID] = task
- at.assignTaskToDisk(task)
- glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations",
- spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations))
- return nil
- }
- // calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
- func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
- switch taskType {
- case TaskTypeErasureCoding:
- switch cleanupType {
- case CleanupVolumeReplica:
- impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
- return impact
- case CleanupECShards:
- return CalculateECShardCleanupImpact(volumeSize)
- default:
- impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
- return impact
- }
- default:
- impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
- return impact
- }
- }
- // SourceCleanupType indicates what type of data needs to be cleaned up from a source
- type SourceCleanupType int
- const (
- CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots)
- CleanupECShards // Clean up existing EC shards (frees shard slots)
- )
- // TaskSourceSpec represents a source specification for task creation
- type TaskSourceSpec struct {
- ServerID string
- DiskID uint32
- DataCenter string // Data center of the source server
- Rack string // Rack of the source server
- CleanupType SourceCleanupType // For EC: volume replica vs existing shards
- StorageImpact *StorageSlotChange // Optional: manual override
- EstimatedSize *int64 // Optional: manual override
- }
- // TaskDestinationSpec represents a destination specification for task creation
- type TaskDestinationSpec struct {
- ServerID string
- DiskID uint32
- StorageImpact *StorageSlotChange // Optional: manual override
- EstimatedSize *int64 // Optional: manual override
- }
- // TaskSpec represents a complete task specification
- type TaskSpec struct {
- TaskID string
- TaskType TaskType
- VolumeID uint32
- VolumeSize int64 // Used for auto-calculation when manual impacts not provided
- Sources []TaskSourceSpec // Can be single or multiple
- Destinations []TaskDestinationSpec // Can be single or multiple
- }
|