task_management.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package topology
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. )
  7. // AssignTask moves a task from pending to assigned and reserves capacity
  8. func (at *ActiveTopology) AssignTask(taskID string) error {
  9. at.mutex.Lock()
  10. defer at.mutex.Unlock()
  11. task, exists := at.pendingTasks[taskID]
  12. if !exists {
  13. return fmt.Errorf("pending task %s not found", taskID)
  14. }
  15. // Check if all destination disks have sufficient capacity to reserve
  16. for _, dest := range task.Destinations {
  17. targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk)
  18. if targetDisk, exists := at.disks[targetKey]; exists {
  19. availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk)
  20. // Check if we have enough total capacity using the improved unified comparison
  21. if !availableCapacity.CanAccommodate(dest.StorageChange) {
  22. return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v",
  23. dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange)
  24. }
  25. } else if dest.TargetServer != "" {
  26. // Fail fast if destination disk is not found in topology
  27. return fmt.Errorf("destination disk %s not found in topology", targetKey)
  28. }
  29. }
  30. // Move task to assigned and reserve capacity
  31. delete(at.pendingTasks, taskID)
  32. task.Status = TaskStatusInProgress
  33. at.assignedTasks[taskID] = task
  34. at.reassignTaskStates()
  35. // Log capacity reservation information for all sources and destinations
  36. totalSourceImpact := StorageSlotChange{}
  37. totalDestImpact := StorageSlotChange{}
  38. for _, source := range task.Sources {
  39. totalSourceImpact.AddInPlace(source.StorageChange)
  40. }
  41. for _, dest := range task.Destinations {
  42. totalDestImpact.AddInPlace(dest.StorageChange)
  43. }
  44. glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
  45. taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
  46. len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
  47. return nil
  48. }
  49. // CompleteTask moves a task from assigned to recent and releases reserved capacity
  50. // NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes)
  51. // should be handled by the master when it receives the task completion notification.
  52. func (at *ActiveTopology) CompleteTask(taskID string) error {
  53. at.mutex.Lock()
  54. defer at.mutex.Unlock()
  55. task, exists := at.assignedTasks[taskID]
  56. if !exists {
  57. return fmt.Errorf("assigned task %s not found", taskID)
  58. }
  59. // Release reserved capacity by moving task to completed state
  60. delete(at.assignedTasks, taskID)
  61. task.Status = TaskStatusCompleted
  62. task.CompletedAt = time.Now()
  63. at.recentTasks[taskID] = task
  64. at.reassignTaskStates()
  65. // Log capacity release information for all sources and destinations
  66. totalSourceImpact := StorageSlotChange{}
  67. totalDestImpact := StorageSlotChange{}
  68. for _, source := range task.Sources {
  69. totalSourceImpact.AddInPlace(source.StorageChange)
  70. }
  71. for _, dest := range task.Destinations {
  72. totalDestImpact.AddInPlace(dest.StorageChange)
  73. }
  74. glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
  75. taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
  76. len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
  77. // Clean up old recent tasks
  78. at.cleanupRecentTasks()
  79. return nil
  80. }
  81. // ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion
  82. // This should be called when the master updates the topology with new VolumeCount information
  83. func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) {
  84. at.mutex.Lock()
  85. defer at.mutex.Unlock()
  86. diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
  87. if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil {
  88. oldCount := disk.DiskInfo.DiskInfo.VolumeCount
  89. disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange
  90. glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)",
  91. diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange)
  92. }
  93. }
  94. // AddPendingTask is the unified function that handles both simple and complex task creation
  95. func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error {
  96. // Validation
  97. if len(spec.Sources) == 0 {
  98. return fmt.Errorf("at least one source is required")
  99. }
  100. if len(spec.Destinations) == 0 {
  101. return fmt.Errorf("at least one destination is required")
  102. }
  103. at.mutex.Lock()
  104. defer at.mutex.Unlock()
  105. // Build sources array
  106. sources := make([]TaskSource, len(spec.Sources))
  107. for i, sourceSpec := range spec.Sources {
  108. var storageImpact StorageSlotChange
  109. var estimatedSize int64
  110. if sourceSpec.StorageImpact != nil {
  111. // Use manually specified impact
  112. storageImpact = *sourceSpec.StorageImpact
  113. } else {
  114. // Auto-calculate based on task type and cleanup type
  115. storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize)
  116. }
  117. if sourceSpec.EstimatedSize != nil {
  118. estimatedSize = *sourceSpec.EstimatedSize
  119. } else {
  120. estimatedSize = spec.VolumeSize // Default to volume size
  121. }
  122. sources[i] = TaskSource{
  123. SourceServer: sourceSpec.ServerID,
  124. SourceDisk: sourceSpec.DiskID,
  125. StorageChange: storageImpact,
  126. EstimatedSize: estimatedSize,
  127. }
  128. }
  129. // Build destinations array
  130. destinations := make([]TaskDestination, len(spec.Destinations))
  131. for i, destSpec := range spec.Destinations {
  132. var storageImpact StorageSlotChange
  133. var estimatedSize int64
  134. if destSpec.StorageImpact != nil {
  135. // Use manually specified impact
  136. storageImpact = *destSpec.StorageImpact
  137. } else {
  138. // Auto-calculate based on task type
  139. _, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize)
  140. }
  141. if destSpec.EstimatedSize != nil {
  142. estimatedSize = *destSpec.EstimatedSize
  143. } else {
  144. estimatedSize = spec.VolumeSize // Default to volume size
  145. }
  146. destinations[i] = TaskDestination{
  147. TargetServer: destSpec.ServerID,
  148. TargetDisk: destSpec.DiskID,
  149. StorageChange: storageImpact,
  150. EstimatedSize: estimatedSize,
  151. }
  152. }
  153. // Create the task
  154. task := &taskState{
  155. VolumeID: spec.VolumeID,
  156. TaskType: spec.TaskType,
  157. Status: TaskStatusPending,
  158. StartedAt: time.Now(),
  159. EstimatedSize: spec.VolumeSize,
  160. Sources: sources,
  161. Destinations: destinations,
  162. }
  163. at.pendingTasks[spec.TaskID] = task
  164. at.assignTaskToDisk(task)
  165. glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations",
  166. spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations))
  167. return nil
  168. }
  169. // calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
  170. func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
  171. switch taskType {
  172. case TaskTypeErasureCoding:
  173. switch cleanupType {
  174. case CleanupVolumeReplica:
  175. impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
  176. return impact
  177. case CleanupECShards:
  178. return CalculateECShardCleanupImpact(volumeSize)
  179. default:
  180. impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
  181. return impact
  182. }
  183. default:
  184. impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
  185. return impact
  186. }
  187. }
  188. // SourceCleanupType indicates what type of data needs to be cleaned up from a source
  189. type SourceCleanupType int
  190. const (
  191. CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots)
  192. CleanupECShards // Clean up existing EC shards (frees shard slots)
  193. )
  194. // TaskSourceSpec represents a source specification for task creation
  195. type TaskSourceSpec struct {
  196. ServerID string
  197. DiskID uint32
  198. DataCenter string // Data center of the source server
  199. Rack string // Rack of the source server
  200. CleanupType SourceCleanupType // For EC: volume replica vs existing shards
  201. StorageImpact *StorageSlotChange // Optional: manual override
  202. EstimatedSize *int64 // Optional: manual override
  203. }
  204. // TaskDestinationSpec represents a destination specification for task creation
  205. type TaskDestinationSpec struct {
  206. ServerID string
  207. DiskID uint32
  208. StorageImpact *StorageSlotChange // Optional: manual override
  209. EstimatedSize *int64 // Optional: manual override
  210. }
  211. // TaskSpec represents a complete task specification
  212. type TaskSpec struct {
  213. TaskID string
  214. TaskType TaskType
  215. VolumeID uint32
  216. VolumeSize int64 // Used for auto-calculation when manual impacts not provided
  217. Sources []TaskSourceSpec // Can be single or multiple
  218. Destinations []TaskDestinationSpec // Can be single or multiple
  219. }