generic_components.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package base
  2. import (
  3. "time"
  4. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  5. )
  6. // GenericDetector implements TaskDetector using function-based logic
  7. type GenericDetector struct {
  8. taskDef *TaskDefinition
  9. }
  10. // NewGenericDetector creates a detector from a task definition
  11. func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector {
  12. return &GenericDetector{taskDef: taskDef}
  13. }
  14. // GetTaskType returns the task type
  15. func (d *GenericDetector) GetTaskType() types.TaskType {
  16. return d.taskDef.Type
  17. }
  18. // ScanForTasks scans using the task definition's detection function
  19. func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
  20. if d.taskDef.DetectionFunc == nil {
  21. return nil, nil
  22. }
  23. return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config)
  24. }
  25. // ScanInterval returns the scan interval from task definition
  26. func (d *GenericDetector) ScanInterval() time.Duration {
  27. if d.taskDef.ScanInterval > 0 {
  28. return d.taskDef.ScanInterval
  29. }
  30. return 30 * time.Minute // Default
  31. }
  32. // IsEnabled returns whether this detector is enabled
  33. func (d *GenericDetector) IsEnabled() bool {
  34. return d.taskDef.Config.IsEnabled()
  35. }
  36. // GenericScheduler implements TaskScheduler using function-based logic
  37. type GenericScheduler struct {
  38. taskDef *TaskDefinition
  39. }
  40. // NewGenericScheduler creates a scheduler from a task definition
  41. func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler {
  42. return &GenericScheduler{taskDef: taskDef}
  43. }
  44. // GetTaskType returns the task type
  45. func (s *GenericScheduler) GetTaskType() types.TaskType {
  46. return s.taskDef.Type
  47. }
  48. // CanScheduleNow determines if a task can be scheduled using the task definition's function
  49. func (s *GenericScheduler) CanScheduleNow(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData) bool {
  50. if s.taskDef.SchedulingFunc == nil {
  51. return s.defaultCanSchedule(task, runningTasks, availableWorkers)
  52. }
  53. return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config)
  54. }
  55. // defaultCanSchedule provides default scheduling logic
  56. func (s *GenericScheduler) defaultCanSchedule(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData) bool {
  57. if !s.taskDef.Config.IsEnabled() {
  58. return false
  59. }
  60. // Count running tasks of this type
  61. runningCount := 0
  62. for _, runningTask := range runningTasks {
  63. if runningTask.Type == s.taskDef.Type {
  64. runningCount++
  65. }
  66. }
  67. // Check concurrency limit
  68. maxConcurrent := s.taskDef.MaxConcurrent
  69. if maxConcurrent <= 0 {
  70. maxConcurrent = 1 // Default
  71. }
  72. if runningCount >= maxConcurrent {
  73. return false
  74. }
  75. // Check if we have available workers
  76. for _, worker := range availableWorkers {
  77. if worker.CurrentLoad < worker.MaxConcurrent {
  78. for _, capability := range worker.Capabilities {
  79. if capability == s.taskDef.Type {
  80. return true
  81. }
  82. }
  83. }
  84. }
  85. return false
  86. }
  87. // GetPriority returns the priority for this task
  88. func (s *GenericScheduler) GetPriority(task *types.TaskInput) types.TaskPriority {
  89. return task.Priority
  90. }
  91. // GetMaxConcurrent returns max concurrent tasks
  92. func (s *GenericScheduler) GetMaxConcurrent() int {
  93. if s.taskDef.MaxConcurrent > 0 {
  94. return s.taskDef.MaxConcurrent
  95. }
  96. return 1 // Default
  97. }
  98. // GetDefaultRepeatInterval returns the default repeat interval
  99. func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration {
  100. if s.taskDef.RepeatInterval > 0 {
  101. return s.taskDef.RepeatInterval
  102. }
  103. return 24 * time.Hour // Default
  104. }
  105. // IsEnabled returns whether this scheduler is enabled
  106. func (s *GenericScheduler) IsEnabled() bool {
  107. return s.taskDef.Config.IsEnabled()
  108. }