| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- package topology
- import (
- "fmt"
- "time"
- )
- // reassignTaskStates assigns tasks to the appropriate disks
- func (at *ActiveTopology) reassignTaskStates() {
- // Clear existing task assignments
- for _, disk := range at.disks {
- disk.pendingTasks = nil
- disk.assignedTasks = nil
- disk.recentTasks = nil
- }
- // Reassign pending tasks
- for _, task := range at.pendingTasks {
- at.assignTaskToDisk(task)
- }
- // Reassign assigned tasks
- for _, task := range at.assignedTasks {
- at.assignTaskToDisk(task)
- }
- // Reassign recent tasks
- for _, task := range at.recentTasks {
- at.assignTaskToDisk(task)
- }
- }
- // assignTaskToDisk assigns a task to the appropriate disk(s)
- func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
- addedDisks := make(map[string]bool)
- // Local helper function to assign task to a disk and avoid code duplication
- assign := func(server string, diskID uint32) {
- key := fmt.Sprintf("%s:%d", server, diskID)
- if server == "" || addedDisks[key] {
- return
- }
- if disk, exists := at.disks[key]; exists {
- switch task.Status {
- case TaskStatusPending:
- disk.pendingTasks = append(disk.pendingTasks, task)
- case TaskStatusInProgress:
- disk.assignedTasks = append(disk.assignedTasks, task)
- case TaskStatusCompleted:
- disk.recentTasks = append(disk.recentTasks, task)
- }
- addedDisks[key] = true
- }
- }
- // Assign to all source disks
- for _, source := range task.Sources {
- assign(source.SourceServer, source.SourceDisk)
- }
- // Assign to all destination disks (duplicates automatically avoided by helper)
- for _, dest := range task.Destinations {
- assign(dest.TargetServer, dest.TargetDisk)
- }
- }
- // isDiskAvailable checks if a disk can accept new tasks
- func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
- // Check if disk has too many pending and active tasks
- activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
- if activeLoad >= MaxConcurrentTasksPerDisk {
- return false
- }
- // Check for conflicting task types
- for _, task := range disk.assignedTasks {
- if at.areTaskTypesConflicting(task.TaskType, taskType) {
- return false
- }
- }
- return true
- }
- // areTaskTypesConflicting checks if two task types conflict
- func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
- // Examples of conflicting task types
- conflictMap := map[TaskType][]TaskType{
- TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
- TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
- TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
- }
- if conflicts, exists := conflictMap[existing]; exists {
- for _, conflictType := range conflicts {
- if conflictType == new {
- return true
- }
- }
- }
- return false
- }
- // cleanupRecentTasks removes old recent tasks
- func (at *ActiveTopology) cleanupRecentTasks() {
- cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
- for taskID, task := range at.recentTasks {
- if task.CompletedAt.Before(cutoff) {
- delete(at.recentTasks, taskID)
- }
- }
- }
|