| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- package base
- import (
- "time"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
- )
- // GenericDetector implements TaskDetector using function-based logic
- type GenericDetector struct {
- taskDef *TaskDefinition
- }
- // NewGenericDetector creates a detector from a task definition
- func NewGenericDetector(taskDef *TaskDefinition) *GenericDetector {
- return &GenericDetector{taskDef: taskDef}
- }
- // GetTaskType returns the task type
- func (d *GenericDetector) GetTaskType() types.TaskType {
- return d.taskDef.Type
- }
- // ScanForTasks scans using the task definition's detection function
- func (d *GenericDetector) ScanForTasks(volumeMetrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) ([]*types.TaskDetectionResult, error) {
- if d.taskDef.DetectionFunc == nil {
- return nil, nil
- }
- return d.taskDef.DetectionFunc(volumeMetrics, clusterInfo, d.taskDef.Config)
- }
- // ScanInterval returns the scan interval from task definition
- func (d *GenericDetector) ScanInterval() time.Duration {
- if d.taskDef.ScanInterval > 0 {
- return d.taskDef.ScanInterval
- }
- return 30 * time.Minute // Default
- }
- // IsEnabled returns whether this detector is enabled
- func (d *GenericDetector) IsEnabled() bool {
- return d.taskDef.Config.IsEnabled()
- }
- // GenericScheduler implements TaskScheduler using function-based logic
- type GenericScheduler struct {
- taskDef *TaskDefinition
- }
- // NewGenericScheduler creates a scheduler from a task definition
- func NewGenericScheduler(taskDef *TaskDefinition) *GenericScheduler {
- return &GenericScheduler{taskDef: taskDef}
- }
- // GetTaskType returns the task type
- func (s *GenericScheduler) GetTaskType() types.TaskType {
- return s.taskDef.Type
- }
- // CanScheduleNow determines if a task can be scheduled using the task definition's function
- func (s *GenericScheduler) CanScheduleNow(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData) bool {
- if s.taskDef.SchedulingFunc == nil {
- return s.defaultCanSchedule(task, runningTasks, availableWorkers)
- }
- return s.taskDef.SchedulingFunc(task, runningTasks, availableWorkers, s.taskDef.Config)
- }
- // defaultCanSchedule provides default scheduling logic
- func (s *GenericScheduler) defaultCanSchedule(task *types.TaskInput, runningTasks []*types.TaskInput, availableWorkers []*types.WorkerData) bool {
- if !s.taskDef.Config.IsEnabled() {
- return false
- }
- // Count running tasks of this type
- runningCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == s.taskDef.Type {
- runningCount++
- }
- }
- // Check concurrency limit
- maxConcurrent := s.taskDef.MaxConcurrent
- if maxConcurrent <= 0 {
- maxConcurrent = 1 // Default
- }
- if runningCount >= maxConcurrent {
- return false
- }
- // Check if we have available workers
- for _, worker := range availableWorkers {
- if worker.CurrentLoad < worker.MaxConcurrent {
- for _, capability := range worker.Capabilities {
- if capability == s.taskDef.Type {
- return true
- }
- }
- }
- }
- return false
- }
- // GetPriority returns the priority for this task
- func (s *GenericScheduler) GetPriority(task *types.TaskInput) types.TaskPriority {
- return task.Priority
- }
- // GetMaxConcurrent returns max concurrent tasks
- func (s *GenericScheduler) GetMaxConcurrent() int {
- if s.taskDef.MaxConcurrent > 0 {
- return s.taskDef.MaxConcurrent
- }
- return 1 // Default
- }
- // GetDefaultRepeatInterval returns the default repeat interval
- func (s *GenericScheduler) GetDefaultRepeatInterval() time.Duration {
- if s.taskDef.RepeatInterval > 0 {
- return s.taskDef.RepeatInterval
- }
- return 24 * time.Hour // Default
- }
- // IsEnabled returns whether this scheduler is enabled
- func (s *GenericScheduler) IsEnabled() bool {
- return s.taskDef.Config.IsEnabled()
- }
|