| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489 |
- package maintenance
- import (
- "time"
- "github.com/seaweedfs/seaweedfs/weed/admin/topology"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
- "github.com/seaweedfs/seaweedfs/weed/worker/types"
- )
- // MaintenanceIntegration bridges the task system with existing maintenance
- type MaintenanceIntegration struct {
- taskRegistry *types.TaskRegistry
- uiRegistry *types.UIRegistry
- // Bridge to existing system
- maintenanceQueue *MaintenanceQueue
- maintenancePolicy *MaintenancePolicy
- // Pending operations tracker
- pendingOperations *PendingOperations
- // Active topology for task detection and target selection
- activeTopology *topology.ActiveTopology
- // Type conversion maps
- taskTypeMap map[types.TaskType]MaintenanceTaskType
- revTaskTypeMap map[MaintenanceTaskType]types.TaskType
- priorityMap map[types.TaskPriority]MaintenanceTaskPriority
- revPriorityMap map[MaintenanceTaskPriority]types.TaskPriority
- }
- // NewMaintenanceIntegration creates the integration bridge
- func NewMaintenanceIntegration(queue *MaintenanceQueue, policy *MaintenancePolicy) *MaintenanceIntegration {
- integration := &MaintenanceIntegration{
- taskRegistry: tasks.GetGlobalTypesRegistry(), // Use global types registry with auto-registered tasks
- uiRegistry: tasks.GetGlobalUIRegistry(), // Use global UI registry with auto-registered UI providers
- maintenanceQueue: queue,
- maintenancePolicy: policy,
- pendingOperations: NewPendingOperations(),
- }
- // Initialize active topology with 10 second recent task window
- integration.activeTopology = topology.NewActiveTopology(10)
- // Initialize type conversion maps
- integration.initializeTypeMaps()
- // Register all tasks
- integration.registerAllTasks()
- return integration
- }
- // initializeTypeMaps creates the type conversion maps for dynamic conversion
- func (s *MaintenanceIntegration) initializeTypeMaps() {
- // Initialize empty maps
- s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType)
- s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType)
- // Build task type mappings dynamically from registered tasks after registration
- // This will be called from registerAllTasks() after all tasks are registered
- // Priority mappings (these are static and don't depend on registered tasks)
- s.priorityMap = map[types.TaskPriority]MaintenanceTaskPriority{
- types.TaskPriorityLow: PriorityLow,
- types.TaskPriorityNormal: PriorityNormal,
- types.TaskPriorityHigh: PriorityHigh,
- }
- // Reverse priority mappings
- s.revPriorityMap = map[MaintenanceTaskPriority]types.TaskPriority{
- PriorityLow: types.TaskPriorityLow,
- PriorityNormal: types.TaskPriorityNormal,
- PriorityHigh: types.TaskPriorityHigh,
- PriorityCritical: types.TaskPriorityHigh, // Map critical to high
- }
- }
- // buildTaskTypeMappings dynamically builds task type mappings from registered tasks
- func (s *MaintenanceIntegration) buildTaskTypeMappings() {
- // Clear existing mappings
- s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType)
- s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType)
- // Build mappings from registered detectors
- for workerTaskType := range s.taskRegistry.GetAllDetectors() {
- // Convert types.TaskType to MaintenanceTaskType by string conversion
- maintenanceTaskType := MaintenanceTaskType(string(workerTaskType))
- s.taskTypeMap[workerTaskType] = maintenanceTaskType
- s.revTaskTypeMap[maintenanceTaskType] = workerTaskType
- glog.V(3).Infof("Dynamically mapped task type: %s <-> %s", workerTaskType, maintenanceTaskType)
- }
- glog.V(2).Infof("Built %d dynamic task type mappings", len(s.taskTypeMap))
- }
- // registerAllTasks registers all available tasks
- func (s *MaintenanceIntegration) registerAllTasks() {
- // Tasks are already auto-registered via import statements
- // No manual registration needed
- // Build dynamic type mappings from registered tasks
- s.buildTaskTypeMappings()
- // Configure tasks from policy
- s.ConfigureTasksFromPolicy()
- registeredTaskTypes := make([]string, 0, len(s.taskTypeMap))
- for _, maintenanceTaskType := range s.taskTypeMap {
- registeredTaskTypes = append(registeredTaskTypes, string(maintenanceTaskType))
- }
- glog.V(1).Infof("Registered tasks: %v", registeredTaskTypes)
- }
- // ConfigureTasksFromPolicy dynamically configures all registered tasks based on the maintenance policy
- func (s *MaintenanceIntegration) ConfigureTasksFromPolicy() {
- if s.maintenancePolicy == nil {
- return
- }
- // Configure all registered detectors and schedulers dynamically using policy configuration
- configuredCount := 0
- // Get all registered task types from the registry
- for taskType, detector := range s.taskRegistry.GetAllDetectors() {
- // Configure detector using policy-based configuration
- s.configureDetectorFromPolicy(taskType, detector)
- configuredCount++
- }
- for taskType, scheduler := range s.taskRegistry.GetAllSchedulers() {
- // Configure scheduler using policy-based configuration
- s.configureSchedulerFromPolicy(taskType, scheduler)
- }
- glog.V(1).Infof("Dynamically configured %d task types from maintenance policy", configuredCount)
- }
- // configureDetectorFromPolicy configures a detector using policy-based configuration
- func (s *MaintenanceIntegration) configureDetectorFromPolicy(taskType types.TaskType, detector types.TaskDetector) {
- // Try to configure using PolicyConfigurableDetector interface if supported
- if configurableDetector, ok := detector.(types.PolicyConfigurableDetector); ok {
- configurableDetector.ConfigureFromPolicy(s.maintenancePolicy)
- glog.V(2).Infof("Configured detector %s using policy interface", taskType)
- return
- }
- // Apply basic configuration that all detectors should support
- if basicDetector, ok := detector.(interface{ SetEnabled(bool) }); ok {
- // Convert task system type to maintenance task type for policy lookup
- maintenanceTaskType, exists := s.taskTypeMap[taskType]
- if exists {
- enabled := IsTaskEnabled(s.maintenancePolicy, maintenanceTaskType)
- basicDetector.SetEnabled(enabled)
- glog.V(3).Infof("Set enabled=%v for detector %s", enabled, taskType)
- }
- }
- // For detectors that don't implement PolicyConfigurableDetector interface,
- // they should be updated to implement it for full policy-based configuration
- glog.V(2).Infof("Detector %s should implement PolicyConfigurableDetector interface for full policy support", taskType)
- }
- // configureSchedulerFromPolicy configures a scheduler using policy-based configuration
- func (s *MaintenanceIntegration) configureSchedulerFromPolicy(taskType types.TaskType, scheduler types.TaskScheduler) {
- // Try to configure using PolicyConfigurableScheduler interface if supported
- if configurableScheduler, ok := scheduler.(types.PolicyConfigurableScheduler); ok {
- configurableScheduler.ConfigureFromPolicy(s.maintenancePolicy)
- glog.V(2).Infof("Configured scheduler %s using policy interface", taskType)
- return
- }
- // Apply basic configuration that all schedulers should support
- maintenanceTaskType, exists := s.taskTypeMap[taskType]
- if !exists {
- glog.V(3).Infof("No maintenance task type mapping for %s, skipping configuration", taskType)
- return
- }
- // Set enabled status if scheduler supports it
- if enableableScheduler, ok := scheduler.(interface{ SetEnabled(bool) }); ok {
- enabled := IsTaskEnabled(s.maintenancePolicy, maintenanceTaskType)
- enableableScheduler.SetEnabled(enabled)
- glog.V(3).Infof("Set enabled=%v for scheduler %s", enabled, taskType)
- }
- // Set max concurrent if scheduler supports it
- if concurrentScheduler, ok := scheduler.(interface{ SetMaxConcurrent(int) }); ok {
- maxConcurrent := GetMaxConcurrent(s.maintenancePolicy, maintenanceTaskType)
- if maxConcurrent > 0 {
- concurrentScheduler.SetMaxConcurrent(maxConcurrent)
- glog.V(3).Infof("Set max concurrent=%d for scheduler %s", maxConcurrent, taskType)
- }
- }
- // For schedulers that don't implement PolicyConfigurableScheduler interface,
- // they should be updated to implement it for full policy-based configuration
- glog.V(2).Infof("Scheduler %s should implement PolicyConfigurableScheduler interface for full policy support", taskType)
- }
- // ScanWithTaskDetectors performs a scan using the task system
- func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.VolumeHealthMetrics) ([]*TaskDetectionResult, error) {
- // Note: ActiveTopology gets updated from topology info instead of volume metrics
- glog.V(2).Infof("Processed %d volume metrics for task detection", len(volumeMetrics))
- // Filter out volumes with pending operations to avoid duplicates
- filteredMetrics := s.pendingOperations.FilterVolumeMetricsExcludingPending(volumeMetrics)
- glog.V(1).Infof("Scanning %d volumes (filtered from %d) excluding pending operations",
- len(filteredMetrics), len(volumeMetrics))
- var allResults []*TaskDetectionResult
- // Create cluster info
- clusterInfo := &types.ClusterInfo{
- TotalVolumes: len(filteredMetrics),
- LastUpdated: time.Now(),
- ActiveTopology: s.activeTopology, // Provide ActiveTopology for destination planning
- }
- // Run detection for each registered task type
- for taskType, detector := range s.taskRegistry.GetAllDetectors() {
- if !detector.IsEnabled() {
- continue
- }
- glog.V(2).Infof("Running detection for task type: %s", taskType)
- results, err := detector.ScanForTasks(filteredMetrics, clusterInfo)
- if err != nil {
- glog.Errorf("Failed to scan for %s tasks: %v", taskType, err)
- continue
- }
- // Convert results to existing system format and check for conflicts
- for _, result := range results {
- existingResult := s.convertToExistingFormat(result)
- if existingResult != nil {
- // Double-check for conflicts with pending operations
- opType := s.mapMaintenanceTaskTypeToPendingOperationType(existingResult.TaskType)
- if !s.pendingOperations.WouldConflictWithPending(existingResult.VolumeID, opType) {
- // All task types should now have TypedParams populated during detection phase
- if existingResult.TypedParams == nil {
- glog.Warningf("Task %s for volume %d has no typed parameters - skipping (task parameter creation may have failed)",
- existingResult.TaskType, existingResult.VolumeID)
- continue
- }
- allResults = append(allResults, existingResult)
- } else {
- glog.V(2).Infof("Skipping task %s for volume %d due to conflict with pending operation",
- existingResult.TaskType, existingResult.VolumeID)
- }
- }
- }
- glog.V(2).Infof("Found %d %s tasks", len(results), taskType)
- }
- return allResults, nil
- }
- // UpdateTopologyInfo updates the volume shard tracker with topology information for empty servers
- func (s *MaintenanceIntegration) UpdateTopologyInfo(topologyInfo *master_pb.TopologyInfo) error {
- return s.activeTopology.UpdateTopology(topologyInfo)
- }
- // convertToExistingFormat converts task results to existing system format using dynamic mapping
- func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetectionResult) *TaskDetectionResult {
- // Convert types using mapping tables
- existingType, exists := s.taskTypeMap[result.TaskType]
- if !exists {
- glog.Warningf("Unknown task type %s, skipping conversion", result.TaskType)
- // Return nil to indicate conversion failed - caller should handle this
- return nil
- }
- existingPriority, exists := s.priorityMap[result.Priority]
- if !exists {
- glog.Warningf("Unknown priority %s, defaulting to normal", result.Priority)
- existingPriority = PriorityNormal
- }
- return &TaskDetectionResult{
- TaskType: existingType,
- VolumeID: result.VolumeID,
- Server: result.Server,
- Collection: result.Collection,
- Priority: existingPriority,
- Reason: result.Reason,
- TypedParams: result.TypedParams,
- ScheduleAt: result.ScheduleAt,
- }
- }
- // CanScheduleWithTaskSchedulers determines if a task can be scheduled using task schedulers with dynamic type conversion
- func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *MaintenanceTask, runningTasks []*MaintenanceTask, availableWorkers []*MaintenanceWorker) bool {
- glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Checking task %s (type: %s)", task.ID, task.Type)
- // Convert existing types to task types using mapping
- taskType, exists := s.revTaskTypeMap[task.Type]
- if !exists {
- glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Unknown task type %s for scheduling, falling back to existing logic", task.Type)
- return false // Fallback to existing logic for unknown types
- }
- glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Mapped task type %s to %s", task.Type, taskType)
- // Convert task objects
- taskObject := s.convertTaskToTaskSystem(task)
- if taskObject == nil {
- glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Failed to convert task %s for scheduling", task.ID)
- return false
- }
- glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Successfully converted task %s", task.ID)
- runningTaskObjects := s.convertTasksToTaskSystem(runningTasks)
- workerObjects := s.convertWorkersToTaskSystem(availableWorkers)
- glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Converted %d running tasks and %d workers", len(runningTaskObjects), len(workerObjects))
- // Get the appropriate scheduler
- scheduler := s.taskRegistry.GetScheduler(taskType)
- if scheduler == nil {
- glog.Infof("DEBUG CanScheduleWithTaskSchedulers: No scheduler found for task type %s", taskType)
- return false
- }
- glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Found scheduler for task type %s", taskType)
- canSchedule := scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects)
- glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Scheduler decision for task %s: %v", task.ID, canSchedule)
- return canSchedule
- }
- // convertTaskToTaskSystem converts existing task to task system format using dynamic mapping
- func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask) *types.TaskInput {
- // Convert task type using mapping
- taskType, exists := s.revTaskTypeMap[task.Type]
- if !exists {
- glog.Errorf("Unknown task type %s in conversion, cannot convert task", task.Type)
- // Return nil to indicate conversion failed
- return nil
- }
- // Convert priority using mapping
- priority, exists := s.revPriorityMap[task.Priority]
- if !exists {
- glog.Warningf("Unknown priority %d in conversion, defaulting to normal", task.Priority)
- priority = types.TaskPriorityNormal
- }
- return &types.TaskInput{
- ID: task.ID,
- Type: taskType,
- Priority: priority,
- VolumeID: task.VolumeID,
- Server: task.Server,
- Collection: task.Collection,
- TypedParams: task.TypedParams,
- CreatedAt: task.CreatedAt,
- }
- }
- // convertTasksToTaskSystem converts multiple tasks
- func (s *MaintenanceIntegration) convertTasksToTaskSystem(tasks []*MaintenanceTask) []*types.TaskInput {
- var result []*types.TaskInput
- for _, task := range tasks {
- converted := s.convertTaskToTaskSystem(task)
- if converted != nil {
- result = append(result, converted)
- }
- }
- return result
- }
- // convertWorkersToTaskSystem converts workers to task system format using dynamic mapping
- func (s *MaintenanceIntegration) convertWorkersToTaskSystem(workers []*MaintenanceWorker) []*types.WorkerData {
- var result []*types.WorkerData
- for _, worker := range workers {
- capabilities := make([]types.TaskType, 0, len(worker.Capabilities))
- for _, cap := range worker.Capabilities {
- // Convert capability using mapping
- taskType, exists := s.revTaskTypeMap[cap]
- if exists {
- capabilities = append(capabilities, taskType)
- } else {
- glog.V(3).Infof("Unknown capability %s for worker %s, skipping", cap, worker.ID)
- }
- }
- result = append(result, &types.WorkerData{
- ID: worker.ID,
- Address: worker.Address,
- Capabilities: capabilities,
- MaxConcurrent: worker.MaxConcurrent,
- CurrentLoad: worker.CurrentLoad,
- })
- }
- return result
- }
- // GetTaskScheduler returns the scheduler for a task type using dynamic mapping
- func (s *MaintenanceIntegration) GetTaskScheduler(taskType MaintenanceTaskType) types.TaskScheduler {
- // Convert task type using mapping
- taskSystemType, exists := s.revTaskTypeMap[taskType]
- if !exists {
- glog.V(3).Infof("Unknown task type %s for scheduler", taskType)
- return nil
- }
- return s.taskRegistry.GetScheduler(taskSystemType)
- }
- // GetUIProvider returns the UI provider for a task type using dynamic mapping
- func (s *MaintenanceIntegration) GetUIProvider(taskType MaintenanceTaskType) types.TaskUIProvider {
- // Convert task type using mapping
- taskSystemType, exists := s.revTaskTypeMap[taskType]
- if !exists {
- glog.V(3).Infof("Unknown task type %s for UI provider", taskType)
- return nil
- }
- return s.uiRegistry.GetProvider(taskSystemType)
- }
- // GetAllTaskStats returns stats for all registered tasks
- func (s *MaintenanceIntegration) GetAllTaskStats() []*types.TaskStats {
- var stats []*types.TaskStats
- for taskType, detector := range s.taskRegistry.GetAllDetectors() {
- uiProvider := s.uiRegistry.GetProvider(taskType)
- if uiProvider == nil {
- continue
- }
- stat := &types.TaskStats{
- TaskType: taskType,
- DisplayName: uiProvider.GetDisplayName(),
- Enabled: detector.IsEnabled(),
- LastScan: time.Now().Add(-detector.ScanInterval()),
- NextScan: time.Now().Add(detector.ScanInterval()),
- ScanInterval: detector.ScanInterval(),
- MaxConcurrent: s.taskRegistry.GetScheduler(taskType).GetMaxConcurrent(),
- // Would need to get these from actual queue/stats
- PendingTasks: 0,
- RunningTasks: 0,
- CompletedToday: 0,
- FailedToday: 0,
- }
- stats = append(stats, stat)
- }
- return stats
- }
- // mapMaintenanceTaskTypeToPendingOperationType converts a maintenance task type to a pending operation type
- func (s *MaintenanceIntegration) mapMaintenanceTaskTypeToPendingOperationType(taskType MaintenanceTaskType) PendingOperationType {
- switch taskType {
- case MaintenanceTaskType("balance"):
- return OpTypeVolumeBalance
- case MaintenanceTaskType("erasure_coding"):
- return OpTypeErasureCoding
- case MaintenanceTaskType("vacuum"):
- return OpTypeVacuum
- case MaintenanceTaskType("replication"):
- return OpTypeReplication
- default:
- // For other task types, assume they're volume operations
- return OpTypeVolumeMove
- }
- }
- // GetPendingOperations returns the pending operations tracker
- func (s *MaintenanceIntegration) GetPendingOperations() *PendingOperations {
- return s.pendingOperations
- }
- // GetActiveTopology returns the active topology for task detection
- func (s *MaintenanceIntegration) GetActiveTopology() *topology.ActiveTopology {
- return s.activeTopology
- }
|