maintenance_integration.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. package maintenance
  2. import (
  3. "time"
  4. "github.com/seaweedfs/seaweedfs/weed/admin/topology"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
  8. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  9. )
  10. // MaintenanceIntegration bridges the task system with existing maintenance
  11. type MaintenanceIntegration struct {
  12. taskRegistry *types.TaskRegistry
  13. uiRegistry *types.UIRegistry
  14. // Bridge to existing system
  15. maintenanceQueue *MaintenanceQueue
  16. maintenancePolicy *MaintenancePolicy
  17. // Pending operations tracker
  18. pendingOperations *PendingOperations
  19. // Active topology for task detection and target selection
  20. activeTopology *topology.ActiveTopology
  21. // Type conversion maps
  22. taskTypeMap map[types.TaskType]MaintenanceTaskType
  23. revTaskTypeMap map[MaintenanceTaskType]types.TaskType
  24. priorityMap map[types.TaskPriority]MaintenanceTaskPriority
  25. revPriorityMap map[MaintenanceTaskPriority]types.TaskPriority
  26. }
  27. // NewMaintenanceIntegration creates the integration bridge
  28. func NewMaintenanceIntegration(queue *MaintenanceQueue, policy *MaintenancePolicy) *MaintenanceIntegration {
  29. integration := &MaintenanceIntegration{
  30. taskRegistry: tasks.GetGlobalTypesRegistry(), // Use global types registry with auto-registered tasks
  31. uiRegistry: tasks.GetGlobalUIRegistry(), // Use global UI registry with auto-registered UI providers
  32. maintenanceQueue: queue,
  33. maintenancePolicy: policy,
  34. pendingOperations: NewPendingOperations(),
  35. }
  36. // Initialize active topology with 10 second recent task window
  37. integration.activeTopology = topology.NewActiveTopology(10)
  38. // Initialize type conversion maps
  39. integration.initializeTypeMaps()
  40. // Register all tasks
  41. integration.registerAllTasks()
  42. return integration
  43. }
  44. // initializeTypeMaps creates the type conversion maps for dynamic conversion
  45. func (s *MaintenanceIntegration) initializeTypeMaps() {
  46. // Initialize empty maps
  47. s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType)
  48. s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType)
  49. // Build task type mappings dynamically from registered tasks after registration
  50. // This will be called from registerAllTasks() after all tasks are registered
  51. // Priority mappings (these are static and don't depend on registered tasks)
  52. s.priorityMap = map[types.TaskPriority]MaintenanceTaskPriority{
  53. types.TaskPriorityLow: PriorityLow,
  54. types.TaskPriorityNormal: PriorityNormal,
  55. types.TaskPriorityHigh: PriorityHigh,
  56. }
  57. // Reverse priority mappings
  58. s.revPriorityMap = map[MaintenanceTaskPriority]types.TaskPriority{
  59. PriorityLow: types.TaskPriorityLow,
  60. PriorityNormal: types.TaskPriorityNormal,
  61. PriorityHigh: types.TaskPriorityHigh,
  62. PriorityCritical: types.TaskPriorityHigh, // Map critical to high
  63. }
  64. }
  65. // buildTaskTypeMappings dynamically builds task type mappings from registered tasks
  66. func (s *MaintenanceIntegration) buildTaskTypeMappings() {
  67. // Clear existing mappings
  68. s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType)
  69. s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType)
  70. // Build mappings from registered detectors
  71. for workerTaskType := range s.taskRegistry.GetAllDetectors() {
  72. // Convert types.TaskType to MaintenanceTaskType by string conversion
  73. maintenanceTaskType := MaintenanceTaskType(string(workerTaskType))
  74. s.taskTypeMap[workerTaskType] = maintenanceTaskType
  75. s.revTaskTypeMap[maintenanceTaskType] = workerTaskType
  76. glog.V(3).Infof("Dynamically mapped task type: %s <-> %s", workerTaskType, maintenanceTaskType)
  77. }
  78. glog.V(2).Infof("Built %d dynamic task type mappings", len(s.taskTypeMap))
  79. }
  80. // registerAllTasks registers all available tasks
  81. func (s *MaintenanceIntegration) registerAllTasks() {
  82. // Tasks are already auto-registered via import statements
  83. // No manual registration needed
  84. // Build dynamic type mappings from registered tasks
  85. s.buildTaskTypeMappings()
  86. // Configure tasks from policy
  87. s.ConfigureTasksFromPolicy()
  88. registeredTaskTypes := make([]string, 0, len(s.taskTypeMap))
  89. for _, maintenanceTaskType := range s.taskTypeMap {
  90. registeredTaskTypes = append(registeredTaskTypes, string(maintenanceTaskType))
  91. }
  92. glog.V(1).Infof("Registered tasks: %v", registeredTaskTypes)
  93. }
  94. // ConfigureTasksFromPolicy dynamically configures all registered tasks based on the maintenance policy
  95. func (s *MaintenanceIntegration) ConfigureTasksFromPolicy() {
  96. if s.maintenancePolicy == nil {
  97. return
  98. }
  99. // Configure all registered detectors and schedulers dynamically using policy configuration
  100. configuredCount := 0
  101. // Get all registered task types from the registry
  102. for taskType, detector := range s.taskRegistry.GetAllDetectors() {
  103. // Configure detector using policy-based configuration
  104. s.configureDetectorFromPolicy(taskType, detector)
  105. configuredCount++
  106. }
  107. for taskType, scheduler := range s.taskRegistry.GetAllSchedulers() {
  108. // Configure scheduler using policy-based configuration
  109. s.configureSchedulerFromPolicy(taskType, scheduler)
  110. }
  111. glog.V(1).Infof("Dynamically configured %d task types from maintenance policy", configuredCount)
  112. }
  113. // configureDetectorFromPolicy configures a detector using policy-based configuration
  114. func (s *MaintenanceIntegration) configureDetectorFromPolicy(taskType types.TaskType, detector types.TaskDetector) {
  115. // Try to configure using PolicyConfigurableDetector interface if supported
  116. if configurableDetector, ok := detector.(types.PolicyConfigurableDetector); ok {
  117. configurableDetector.ConfigureFromPolicy(s.maintenancePolicy)
  118. glog.V(2).Infof("Configured detector %s using policy interface", taskType)
  119. return
  120. }
  121. // Apply basic configuration that all detectors should support
  122. if basicDetector, ok := detector.(interface{ SetEnabled(bool) }); ok {
  123. // Convert task system type to maintenance task type for policy lookup
  124. maintenanceTaskType, exists := s.taskTypeMap[taskType]
  125. if exists {
  126. enabled := IsTaskEnabled(s.maintenancePolicy, maintenanceTaskType)
  127. basicDetector.SetEnabled(enabled)
  128. glog.V(3).Infof("Set enabled=%v for detector %s", enabled, taskType)
  129. }
  130. }
  131. // For detectors that don't implement PolicyConfigurableDetector interface,
  132. // they should be updated to implement it for full policy-based configuration
  133. glog.V(2).Infof("Detector %s should implement PolicyConfigurableDetector interface for full policy support", taskType)
  134. }
  135. // configureSchedulerFromPolicy configures a scheduler using policy-based configuration
  136. func (s *MaintenanceIntegration) configureSchedulerFromPolicy(taskType types.TaskType, scheduler types.TaskScheduler) {
  137. // Try to configure using PolicyConfigurableScheduler interface if supported
  138. if configurableScheduler, ok := scheduler.(types.PolicyConfigurableScheduler); ok {
  139. configurableScheduler.ConfigureFromPolicy(s.maintenancePolicy)
  140. glog.V(2).Infof("Configured scheduler %s using policy interface", taskType)
  141. return
  142. }
  143. // Apply basic configuration that all schedulers should support
  144. maintenanceTaskType, exists := s.taskTypeMap[taskType]
  145. if !exists {
  146. glog.V(3).Infof("No maintenance task type mapping for %s, skipping configuration", taskType)
  147. return
  148. }
  149. // Set enabled status if scheduler supports it
  150. if enableableScheduler, ok := scheduler.(interface{ SetEnabled(bool) }); ok {
  151. enabled := IsTaskEnabled(s.maintenancePolicy, maintenanceTaskType)
  152. enableableScheduler.SetEnabled(enabled)
  153. glog.V(3).Infof("Set enabled=%v for scheduler %s", enabled, taskType)
  154. }
  155. // Set max concurrent if scheduler supports it
  156. if concurrentScheduler, ok := scheduler.(interface{ SetMaxConcurrent(int) }); ok {
  157. maxConcurrent := GetMaxConcurrent(s.maintenancePolicy, maintenanceTaskType)
  158. if maxConcurrent > 0 {
  159. concurrentScheduler.SetMaxConcurrent(maxConcurrent)
  160. glog.V(3).Infof("Set max concurrent=%d for scheduler %s", maxConcurrent, taskType)
  161. }
  162. }
  163. // For schedulers that don't implement PolicyConfigurableScheduler interface,
  164. // they should be updated to implement it for full policy-based configuration
  165. glog.V(2).Infof("Scheduler %s should implement PolicyConfigurableScheduler interface for full policy support", taskType)
  166. }
  167. // ScanWithTaskDetectors performs a scan using the task system
  168. func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.VolumeHealthMetrics) ([]*TaskDetectionResult, error) {
  169. // Note: ActiveTopology gets updated from topology info instead of volume metrics
  170. glog.V(2).Infof("Processed %d volume metrics for task detection", len(volumeMetrics))
  171. // Filter out volumes with pending operations to avoid duplicates
  172. filteredMetrics := s.pendingOperations.FilterVolumeMetricsExcludingPending(volumeMetrics)
  173. glog.V(1).Infof("Scanning %d volumes (filtered from %d) excluding pending operations",
  174. len(filteredMetrics), len(volumeMetrics))
  175. var allResults []*TaskDetectionResult
  176. // Create cluster info
  177. clusterInfo := &types.ClusterInfo{
  178. TotalVolumes: len(filteredMetrics),
  179. LastUpdated: time.Now(),
  180. ActiveTopology: s.activeTopology, // Provide ActiveTopology for destination planning
  181. }
  182. // Run detection for each registered task type
  183. for taskType, detector := range s.taskRegistry.GetAllDetectors() {
  184. if !detector.IsEnabled() {
  185. continue
  186. }
  187. glog.V(2).Infof("Running detection for task type: %s", taskType)
  188. results, err := detector.ScanForTasks(filteredMetrics, clusterInfo)
  189. if err != nil {
  190. glog.Errorf("Failed to scan for %s tasks: %v", taskType, err)
  191. continue
  192. }
  193. // Convert results to existing system format and check for conflicts
  194. for _, result := range results {
  195. existingResult := s.convertToExistingFormat(result)
  196. if existingResult != nil {
  197. // Double-check for conflicts with pending operations
  198. opType := s.mapMaintenanceTaskTypeToPendingOperationType(existingResult.TaskType)
  199. if !s.pendingOperations.WouldConflictWithPending(existingResult.VolumeID, opType) {
  200. // All task types should now have TypedParams populated during detection phase
  201. if existingResult.TypedParams == nil {
  202. glog.Warningf("Task %s for volume %d has no typed parameters - skipping (task parameter creation may have failed)",
  203. existingResult.TaskType, existingResult.VolumeID)
  204. continue
  205. }
  206. allResults = append(allResults, existingResult)
  207. } else {
  208. glog.V(2).Infof("Skipping task %s for volume %d due to conflict with pending operation",
  209. existingResult.TaskType, existingResult.VolumeID)
  210. }
  211. }
  212. }
  213. glog.V(2).Infof("Found %d %s tasks", len(results), taskType)
  214. }
  215. return allResults, nil
  216. }
  217. // UpdateTopologyInfo updates the volume shard tracker with topology information for empty servers
  218. func (s *MaintenanceIntegration) UpdateTopologyInfo(topologyInfo *master_pb.TopologyInfo) error {
  219. return s.activeTopology.UpdateTopology(topologyInfo)
  220. }
  221. // convertToExistingFormat converts task results to existing system format using dynamic mapping
  222. func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetectionResult) *TaskDetectionResult {
  223. // Convert types using mapping tables
  224. existingType, exists := s.taskTypeMap[result.TaskType]
  225. if !exists {
  226. glog.Warningf("Unknown task type %s, skipping conversion", result.TaskType)
  227. // Return nil to indicate conversion failed - caller should handle this
  228. return nil
  229. }
  230. existingPriority, exists := s.priorityMap[result.Priority]
  231. if !exists {
  232. glog.Warningf("Unknown priority %s, defaulting to normal", result.Priority)
  233. existingPriority = PriorityNormal
  234. }
  235. return &TaskDetectionResult{
  236. TaskType: existingType,
  237. VolumeID: result.VolumeID,
  238. Server: result.Server,
  239. Collection: result.Collection,
  240. Priority: existingPriority,
  241. Reason: result.Reason,
  242. TypedParams: result.TypedParams,
  243. ScheduleAt: result.ScheduleAt,
  244. }
  245. }
  246. // CanScheduleWithTaskSchedulers determines if a task can be scheduled using task schedulers with dynamic type conversion
  247. func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *MaintenanceTask, runningTasks []*MaintenanceTask, availableWorkers []*MaintenanceWorker) bool {
  248. glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Checking task %s (type: %s)", task.ID, task.Type)
  249. // Convert existing types to task types using mapping
  250. taskType, exists := s.revTaskTypeMap[task.Type]
  251. if !exists {
  252. glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Unknown task type %s for scheduling, falling back to existing logic", task.Type)
  253. return false // Fallback to existing logic for unknown types
  254. }
  255. glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Mapped task type %s to %s", task.Type, taskType)
  256. // Convert task objects
  257. taskObject := s.convertTaskToTaskSystem(task)
  258. if taskObject == nil {
  259. glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Failed to convert task %s for scheduling", task.ID)
  260. return false
  261. }
  262. glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Successfully converted task %s", task.ID)
  263. runningTaskObjects := s.convertTasksToTaskSystem(runningTasks)
  264. workerObjects := s.convertWorkersToTaskSystem(availableWorkers)
  265. glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Converted %d running tasks and %d workers", len(runningTaskObjects), len(workerObjects))
  266. // Get the appropriate scheduler
  267. scheduler := s.taskRegistry.GetScheduler(taskType)
  268. if scheduler == nil {
  269. glog.Infof("DEBUG CanScheduleWithTaskSchedulers: No scheduler found for task type %s", taskType)
  270. return false
  271. }
  272. glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Found scheduler for task type %s", taskType)
  273. canSchedule := scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects)
  274. glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Scheduler decision for task %s: %v", task.ID, canSchedule)
  275. return canSchedule
  276. }
  277. // convertTaskToTaskSystem converts existing task to task system format using dynamic mapping
  278. func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask) *types.TaskInput {
  279. // Convert task type using mapping
  280. taskType, exists := s.revTaskTypeMap[task.Type]
  281. if !exists {
  282. glog.Errorf("Unknown task type %s in conversion, cannot convert task", task.Type)
  283. // Return nil to indicate conversion failed
  284. return nil
  285. }
  286. // Convert priority using mapping
  287. priority, exists := s.revPriorityMap[task.Priority]
  288. if !exists {
  289. glog.Warningf("Unknown priority %d in conversion, defaulting to normal", task.Priority)
  290. priority = types.TaskPriorityNormal
  291. }
  292. return &types.TaskInput{
  293. ID: task.ID,
  294. Type: taskType,
  295. Priority: priority,
  296. VolumeID: task.VolumeID,
  297. Server: task.Server,
  298. Collection: task.Collection,
  299. TypedParams: task.TypedParams,
  300. CreatedAt: task.CreatedAt,
  301. }
  302. }
  303. // convertTasksToTaskSystem converts multiple tasks
  304. func (s *MaintenanceIntegration) convertTasksToTaskSystem(tasks []*MaintenanceTask) []*types.TaskInput {
  305. var result []*types.TaskInput
  306. for _, task := range tasks {
  307. converted := s.convertTaskToTaskSystem(task)
  308. if converted != nil {
  309. result = append(result, converted)
  310. }
  311. }
  312. return result
  313. }
  314. // convertWorkersToTaskSystem converts workers to task system format using dynamic mapping
  315. func (s *MaintenanceIntegration) convertWorkersToTaskSystem(workers []*MaintenanceWorker) []*types.WorkerData {
  316. var result []*types.WorkerData
  317. for _, worker := range workers {
  318. capabilities := make([]types.TaskType, 0, len(worker.Capabilities))
  319. for _, cap := range worker.Capabilities {
  320. // Convert capability using mapping
  321. taskType, exists := s.revTaskTypeMap[cap]
  322. if exists {
  323. capabilities = append(capabilities, taskType)
  324. } else {
  325. glog.V(3).Infof("Unknown capability %s for worker %s, skipping", cap, worker.ID)
  326. }
  327. }
  328. result = append(result, &types.WorkerData{
  329. ID: worker.ID,
  330. Address: worker.Address,
  331. Capabilities: capabilities,
  332. MaxConcurrent: worker.MaxConcurrent,
  333. CurrentLoad: worker.CurrentLoad,
  334. })
  335. }
  336. return result
  337. }
  338. // GetTaskScheduler returns the scheduler for a task type using dynamic mapping
  339. func (s *MaintenanceIntegration) GetTaskScheduler(taskType MaintenanceTaskType) types.TaskScheduler {
  340. // Convert task type using mapping
  341. taskSystemType, exists := s.revTaskTypeMap[taskType]
  342. if !exists {
  343. glog.V(3).Infof("Unknown task type %s for scheduler", taskType)
  344. return nil
  345. }
  346. return s.taskRegistry.GetScheduler(taskSystemType)
  347. }
  348. // GetUIProvider returns the UI provider for a task type using dynamic mapping
  349. func (s *MaintenanceIntegration) GetUIProvider(taskType MaintenanceTaskType) types.TaskUIProvider {
  350. // Convert task type using mapping
  351. taskSystemType, exists := s.revTaskTypeMap[taskType]
  352. if !exists {
  353. glog.V(3).Infof("Unknown task type %s for UI provider", taskType)
  354. return nil
  355. }
  356. return s.uiRegistry.GetProvider(taskSystemType)
  357. }
  358. // GetAllTaskStats returns stats for all registered tasks
  359. func (s *MaintenanceIntegration) GetAllTaskStats() []*types.TaskStats {
  360. var stats []*types.TaskStats
  361. for taskType, detector := range s.taskRegistry.GetAllDetectors() {
  362. uiProvider := s.uiRegistry.GetProvider(taskType)
  363. if uiProvider == nil {
  364. continue
  365. }
  366. stat := &types.TaskStats{
  367. TaskType: taskType,
  368. DisplayName: uiProvider.GetDisplayName(),
  369. Enabled: detector.IsEnabled(),
  370. LastScan: time.Now().Add(-detector.ScanInterval()),
  371. NextScan: time.Now().Add(detector.ScanInterval()),
  372. ScanInterval: detector.ScanInterval(),
  373. MaxConcurrent: s.taskRegistry.GetScheduler(taskType).GetMaxConcurrent(),
  374. // Would need to get these from actual queue/stats
  375. PendingTasks: 0,
  376. RunningTasks: 0,
  377. CompletedToday: 0,
  378. FailedToday: 0,
  379. }
  380. stats = append(stats, stat)
  381. }
  382. return stats
  383. }
  384. // mapMaintenanceTaskTypeToPendingOperationType converts a maintenance task type to a pending operation type
  385. func (s *MaintenanceIntegration) mapMaintenanceTaskTypeToPendingOperationType(taskType MaintenanceTaskType) PendingOperationType {
  386. switch taskType {
  387. case MaintenanceTaskType("balance"):
  388. return OpTypeVolumeBalance
  389. case MaintenanceTaskType("erasure_coding"):
  390. return OpTypeErasureCoding
  391. case MaintenanceTaskType("vacuum"):
  392. return OpTypeVacuum
  393. case MaintenanceTaskType("replication"):
  394. return OpTypeReplication
  395. default:
  396. // For other task types, assume they're volume operations
  397. return OpTypeVolumeMove
  398. }
  399. }
  400. // GetPendingOperations returns the pending operations tracker
  401. func (s *MaintenanceIntegration) GetPendingOperations() *PendingOperations {
  402. return s.pendingOperations
  403. }
  404. // GetActiveTopology returns the active topology for task detection
  405. func (s *MaintenanceIntegration) GetActiveTopology() *topology.ActiveTopology {
  406. return s.activeTopology
  407. }