maintenance_queue.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936
  1. package maintenance
  2. import (
  3. "crypto/rand"
  4. "fmt"
  5. "sort"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. )
  9. // NewMaintenanceQueue creates a new maintenance queue
  10. func NewMaintenanceQueue(policy *MaintenancePolicy) *MaintenanceQueue {
  11. queue := &MaintenanceQueue{
  12. tasks: make(map[string]*MaintenanceTask),
  13. workers: make(map[string]*MaintenanceWorker),
  14. pendingTasks: make([]*MaintenanceTask, 0),
  15. policy: policy,
  16. }
  17. return queue
  18. }
  19. // SetIntegration sets the integration reference
  20. func (mq *MaintenanceQueue) SetIntegration(integration *MaintenanceIntegration) {
  21. mq.integration = integration
  22. glog.V(1).Infof("Maintenance queue configured with integration")
  23. }
  24. // SetPersistence sets the task persistence interface
  25. func (mq *MaintenanceQueue) SetPersistence(persistence TaskPersistence) {
  26. mq.persistence = persistence
  27. glog.V(1).Infof("Maintenance queue configured with task persistence")
  28. }
  29. // LoadTasksFromPersistence loads tasks from persistent storage on startup
  30. func (mq *MaintenanceQueue) LoadTasksFromPersistence() error {
  31. if mq.persistence == nil {
  32. glog.V(1).Infof("No task persistence configured, skipping task loading")
  33. return nil
  34. }
  35. mq.mutex.Lock()
  36. defer mq.mutex.Unlock()
  37. glog.Infof("Loading tasks from persistence...")
  38. tasks, err := mq.persistence.LoadAllTaskStates()
  39. if err != nil {
  40. return fmt.Errorf("failed to load task states: %w", err)
  41. }
  42. glog.Infof("DEBUG LoadTasksFromPersistence: Found %d tasks in persistence", len(tasks))
  43. // Reset task maps
  44. mq.tasks = make(map[string]*MaintenanceTask)
  45. mq.pendingTasks = make([]*MaintenanceTask, 0)
  46. // Load tasks by status
  47. for _, task := range tasks {
  48. glog.Infof("DEBUG LoadTasksFromPersistence: Loading task %s (type: %s, status: %s, scheduled: %v)", task.ID, task.Type, task.Status, task.ScheduledAt)
  49. mq.tasks[task.ID] = task
  50. switch task.Status {
  51. case TaskStatusPending:
  52. glog.Infof("DEBUG LoadTasksFromPersistence: Adding task %s to pending queue", task.ID)
  53. mq.pendingTasks = append(mq.pendingTasks, task)
  54. case TaskStatusAssigned, TaskStatusInProgress:
  55. // For assigned/in-progress tasks, we need to check if the worker is still available
  56. // If not, we should fail them and make them eligible for retry
  57. if task.WorkerID != "" {
  58. if _, exists := mq.workers[task.WorkerID]; !exists {
  59. glog.Warningf("Task %s was assigned to unavailable worker %s, marking as failed", task.ID, task.WorkerID)
  60. task.Status = TaskStatusFailed
  61. task.Error = "Worker unavailable after restart"
  62. completedTime := time.Now()
  63. task.CompletedAt = &completedTime
  64. // Check if it should be retried
  65. if task.RetryCount < task.MaxRetries {
  66. task.RetryCount++
  67. task.Status = TaskStatusPending
  68. task.WorkerID = ""
  69. task.StartedAt = nil
  70. task.CompletedAt = nil
  71. task.Error = ""
  72. task.ScheduledAt = time.Now().Add(1 * time.Minute) // Retry after restart delay
  73. glog.Infof("DEBUG LoadTasksFromPersistence: Retrying task %s, adding to pending queue", task.ID)
  74. mq.pendingTasks = append(mq.pendingTasks, task)
  75. }
  76. }
  77. }
  78. }
  79. }
  80. // Sort pending tasks by priority and schedule time
  81. sort.Slice(mq.pendingTasks, func(i, j int) bool {
  82. if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority {
  83. return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority
  84. }
  85. return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
  86. })
  87. glog.Infof("Loaded %d tasks from persistence (%d pending)", len(tasks), len(mq.pendingTasks))
  88. return nil
  89. }
  90. // saveTaskState saves a task to persistent storage
  91. func (mq *MaintenanceQueue) saveTaskState(task *MaintenanceTask) {
  92. if mq.persistence != nil {
  93. if err := mq.persistence.SaveTaskState(task); err != nil {
  94. glog.Errorf("Failed to save task state for %s: %v", task.ID, err)
  95. }
  96. }
  97. }
  98. // cleanupCompletedTasks removes old completed tasks beyond the retention limit
  99. func (mq *MaintenanceQueue) cleanupCompletedTasks() {
  100. if mq.persistence != nil {
  101. if err := mq.persistence.CleanupCompletedTasks(); err != nil {
  102. glog.Errorf("Failed to cleanup completed tasks: %v", err)
  103. }
  104. }
  105. }
  106. // AddTask adds a new maintenance task to the queue with deduplication
  107. func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
  108. mq.mutex.Lock()
  109. defer mq.mutex.Unlock()
  110. // Check for duplicate tasks (same type + volume + not completed)
  111. if mq.hasDuplicateTask(task) {
  112. glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)",
  113. task.Type, task.VolumeID, task.Server)
  114. return
  115. }
  116. task.ID = generateTaskID()
  117. task.Status = TaskStatusPending
  118. task.CreatedAt = time.Now()
  119. task.MaxRetries = 3 // Default retry count
  120. // Initialize assignment history and set creation context
  121. task.AssignmentHistory = make([]*TaskAssignmentRecord, 0)
  122. if task.CreatedBy == "" {
  123. task.CreatedBy = "maintenance-system"
  124. }
  125. if task.CreationContext == "" {
  126. task.CreationContext = "Automatic task creation based on system monitoring"
  127. }
  128. if task.Tags == nil {
  129. task.Tags = make(map[string]string)
  130. }
  131. mq.tasks[task.ID] = task
  132. mq.pendingTasks = append(mq.pendingTasks, task)
  133. // Sort pending tasks by priority and schedule time
  134. sort.Slice(mq.pendingTasks, func(i, j int) bool {
  135. if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority {
  136. return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority
  137. }
  138. return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
  139. })
  140. // Save task state to persistence
  141. mq.saveTaskState(task)
  142. scheduleInfo := ""
  143. if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute {
  144. scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05"))
  145. }
  146. glog.Infof("Task queued: %s (%s) volume %d on %s, priority %d%s, reason: %s",
  147. task.ID, task.Type, task.VolumeID, task.Server, task.Priority, scheduleInfo, task.Reason)
  148. }
  149. // hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed)
  150. func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool {
  151. for _, existingTask := range mq.tasks {
  152. if existingTask.Type == newTask.Type &&
  153. existingTask.VolumeID == newTask.VolumeID &&
  154. existingTask.Server == newTask.Server &&
  155. (existingTask.Status == TaskStatusPending ||
  156. existingTask.Status == TaskStatusAssigned ||
  157. existingTask.Status == TaskStatusInProgress) {
  158. return true
  159. }
  160. }
  161. return false
  162. }
  163. // AddTasksFromResults converts detection results to tasks and adds them to the queue
  164. func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) {
  165. for _, result := range results {
  166. // Validate that task has proper typed parameters
  167. if result.TypedParams == nil {
  168. glog.Warningf("Rejecting invalid task: %s for volume %d on %s - no typed parameters (insufficient destinations or planning failed)",
  169. result.TaskType, result.VolumeID, result.Server)
  170. continue
  171. }
  172. task := &MaintenanceTask{
  173. Type: result.TaskType,
  174. Priority: result.Priority,
  175. VolumeID: result.VolumeID,
  176. Server: result.Server,
  177. Collection: result.Collection,
  178. // Copy typed protobuf parameters
  179. TypedParams: result.TypedParams,
  180. Reason: result.Reason,
  181. ScheduledAt: result.ScheduleAt,
  182. }
  183. mq.AddTask(task)
  184. }
  185. }
  186. // GetNextTask returns the next available task for a worker
  187. func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
  188. // Use read lock for initial checks and search
  189. mq.mutex.RLock()
  190. worker, exists := mq.workers[workerID]
  191. if !exists {
  192. mq.mutex.RUnlock()
  193. glog.V(2).Infof("Task assignment failed for worker %s: worker not registered", workerID)
  194. return nil
  195. }
  196. // Check if worker has capacity
  197. if worker.CurrentLoad >= worker.MaxConcurrent {
  198. mq.mutex.RUnlock()
  199. glog.V(2).Infof("Task assignment failed for worker %s: at capacity (%d/%d)", workerID, worker.CurrentLoad, worker.MaxConcurrent)
  200. return nil
  201. }
  202. now := time.Now()
  203. var selectedTask *MaintenanceTask
  204. var selectedIndex int = -1
  205. // Find the next suitable task (using read lock)
  206. for i, task := range mq.pendingTasks {
  207. // Check if it's time to execute the task
  208. if task.ScheduledAt.After(now) {
  209. glog.V(3).Infof("Task %s skipped for worker %s: scheduled for future (%v)", task.ID, workerID, task.ScheduledAt)
  210. continue
  211. }
  212. // Check if worker can handle this task type
  213. if !mq.workerCanHandle(task.Type, capabilities) {
  214. glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities)
  215. continue
  216. }
  217. // Check if this task type needs a cooldown period
  218. if !mq.canScheduleTaskNow(task) {
  219. // Add detailed diagnostic information
  220. runningCount := mq.GetRunningTaskCount(task.Type)
  221. maxConcurrent := mq.getMaxConcurrentForTaskType(task.Type)
  222. glog.V(2).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met (running: %d, max: %d)",
  223. task.ID, task.Type, workerID, runningCount, maxConcurrent)
  224. continue
  225. }
  226. // Found a suitable task
  227. selectedTask = task
  228. selectedIndex = i
  229. break
  230. }
  231. // Release read lock
  232. mq.mutex.RUnlock()
  233. // If no task found, return nil
  234. if selectedTask == nil {
  235. glog.V(2).Infof("No suitable tasks available for worker %s (checked %d pending tasks)", workerID, len(mq.pendingTasks))
  236. return nil
  237. }
  238. // Now acquire write lock to actually assign the task
  239. mq.mutex.Lock()
  240. defer mq.mutex.Unlock()
  241. // Re-check that the task is still available (it might have been assigned to another worker)
  242. if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTask.ID {
  243. glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTask.ID, workerID)
  244. return nil
  245. }
  246. // Record assignment history
  247. workerAddress := ""
  248. if worker, exists := mq.workers[workerID]; exists {
  249. workerAddress = worker.Address
  250. }
  251. // Create assignment record
  252. assignmentRecord := &TaskAssignmentRecord{
  253. WorkerID: workerID,
  254. WorkerAddress: workerAddress,
  255. AssignedAt: now,
  256. Reason: "Task assigned to available worker",
  257. }
  258. // Initialize assignment history if nil
  259. if selectedTask.AssignmentHistory == nil {
  260. selectedTask.AssignmentHistory = make([]*TaskAssignmentRecord, 0)
  261. }
  262. selectedTask.AssignmentHistory = append(selectedTask.AssignmentHistory, assignmentRecord)
  263. // Assign the task
  264. selectedTask.Status = TaskStatusAssigned
  265. selectedTask.WorkerID = workerID
  266. selectedTask.StartedAt = &now
  267. // Remove from pending tasks
  268. mq.pendingTasks = append(mq.pendingTasks[:selectedIndex], mq.pendingTasks[selectedIndex+1:]...)
  269. // Update worker load
  270. if worker, exists := mq.workers[workerID]; exists {
  271. worker.CurrentLoad++
  272. }
  273. // Track pending operation
  274. mq.trackPendingOperation(selectedTask)
  275. // Save task state after assignment
  276. mq.saveTaskState(selectedTask)
  277. glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)",
  278. selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server)
  279. return selectedTask
  280. }
  281. // CompleteTask marks a task as completed
  282. func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
  283. mq.mutex.Lock()
  284. defer mq.mutex.Unlock()
  285. task, exists := mq.tasks[taskID]
  286. if !exists {
  287. glog.Warningf("Attempted to complete non-existent task: %s", taskID)
  288. return
  289. }
  290. completedTime := time.Now()
  291. task.CompletedAt = &completedTime
  292. // Calculate task duration
  293. var duration time.Duration
  294. if task.StartedAt != nil {
  295. duration = completedTime.Sub(*task.StartedAt)
  296. }
  297. if error != "" {
  298. task.Status = TaskStatusFailed
  299. task.Error = error
  300. // Check if task should be retried
  301. if task.RetryCount < task.MaxRetries {
  302. // Record unassignment due to failure/retry
  303. if task.WorkerID != "" && len(task.AssignmentHistory) > 0 {
  304. lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
  305. if lastAssignment.UnassignedAt == nil {
  306. unassignedTime := completedTime
  307. lastAssignment.UnassignedAt = &unassignedTime
  308. lastAssignment.Reason = fmt.Sprintf("Task failed, scheduling retry (attempt %d/%d): %s",
  309. task.RetryCount+1, task.MaxRetries, error)
  310. }
  311. }
  312. task.RetryCount++
  313. task.Status = TaskStatusPending
  314. task.WorkerID = ""
  315. task.StartedAt = nil
  316. task.CompletedAt = nil
  317. task.Error = ""
  318. task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay
  319. mq.pendingTasks = append(mq.pendingTasks, task)
  320. // Save task state after retry setup
  321. mq.saveTaskState(task)
  322. glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s",
  323. taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error)
  324. } else {
  325. // Record unassignment due to permanent failure
  326. if task.WorkerID != "" && len(task.AssignmentHistory) > 0 {
  327. lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
  328. if lastAssignment.UnassignedAt == nil {
  329. unassignedTime := completedTime
  330. lastAssignment.UnassignedAt = &unassignedTime
  331. lastAssignment.Reason = fmt.Sprintf("Task failed permanently after %d retries: %s", task.MaxRetries, error)
  332. }
  333. }
  334. // Save task state after permanent failure
  335. mq.saveTaskState(task)
  336. glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s",
  337. taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error)
  338. }
  339. } else {
  340. task.Status = TaskStatusCompleted
  341. task.Progress = 100
  342. // Save task state after successful completion
  343. mq.saveTaskState(task)
  344. glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d",
  345. taskID, task.Type, task.WorkerID, duration, task.VolumeID)
  346. }
  347. // Update worker
  348. if task.WorkerID != "" {
  349. if worker, exists := mq.workers[task.WorkerID]; exists {
  350. worker.CurrentTask = nil
  351. worker.CurrentLoad--
  352. if worker.CurrentLoad == 0 {
  353. worker.Status = "active"
  354. }
  355. }
  356. }
  357. // Remove pending operation (unless it's being retried)
  358. if task.Status != TaskStatusPending {
  359. mq.removePendingOperation(taskID)
  360. }
  361. // Periodically cleanup old completed tasks (every 10th completion)
  362. if task.Status == TaskStatusCompleted {
  363. // Simple counter-based trigger for cleanup
  364. if len(mq.tasks)%10 == 0 {
  365. go mq.cleanupCompletedTasks()
  366. }
  367. }
  368. }
  369. // UpdateTaskProgress updates the progress of a running task
  370. func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) {
  371. mq.mutex.RLock()
  372. defer mq.mutex.RUnlock()
  373. if task, exists := mq.tasks[taskID]; exists {
  374. oldProgress := task.Progress
  375. task.Progress = progress
  376. task.Status = TaskStatusInProgress
  377. // Update pending operation status
  378. mq.updatePendingOperationStatus(taskID, "in_progress")
  379. // Log progress at significant milestones or changes
  380. if progress == 0 {
  381. glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d",
  382. taskID, task.Type, task.WorkerID, task.VolumeID)
  383. } else if progress >= 100 {
  384. glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
  385. taskID, task.Type, task.WorkerID, progress)
  386. } else if progress-oldProgress >= 25 { // Log every 25% increment
  387. glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
  388. taskID, task.Type, task.WorkerID, progress)
  389. }
  390. // Save task state after progress update
  391. if progress == 0 || progress >= 100 || progress-oldProgress >= 10 {
  392. mq.saveTaskState(task)
  393. }
  394. } else {
  395. glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress)
  396. }
  397. }
  398. // RegisterWorker registers a new worker
  399. func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) {
  400. mq.mutex.Lock()
  401. defer mq.mutex.Unlock()
  402. isNewWorker := true
  403. if existingWorker, exists := mq.workers[worker.ID]; exists {
  404. isNewWorker = false
  405. glog.Infof("Worker reconnected: %s at %s (capabilities: %v, max concurrent: %d)",
  406. worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
  407. // Preserve current load when reconnecting
  408. worker.CurrentLoad = existingWorker.CurrentLoad
  409. } else {
  410. glog.Infof("Worker registered: %s at %s (capabilities: %v, max concurrent: %d)",
  411. worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
  412. }
  413. worker.LastHeartbeat = time.Now()
  414. worker.Status = "active"
  415. if isNewWorker {
  416. worker.CurrentLoad = 0
  417. }
  418. mq.workers[worker.ID] = worker
  419. }
  420. // UpdateWorkerHeartbeat updates worker heartbeat
  421. func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) {
  422. mq.mutex.Lock()
  423. defer mq.mutex.Unlock()
  424. if worker, exists := mq.workers[workerID]; exists {
  425. lastSeen := worker.LastHeartbeat
  426. worker.LastHeartbeat = time.Now()
  427. // Log if worker was offline for a while
  428. if time.Since(lastSeen) > 2*time.Minute {
  429. glog.Infof("Worker %s heartbeat resumed after %v", workerID, time.Since(lastSeen))
  430. }
  431. } else {
  432. glog.V(2).Infof("Heartbeat from unknown worker: %s", workerID)
  433. }
  434. }
  435. // GetRunningTaskCount returns the number of running tasks of a specific type
  436. func (mq *MaintenanceQueue) GetRunningTaskCount(taskType MaintenanceTaskType) int {
  437. mq.mutex.RLock()
  438. defer mq.mutex.RUnlock()
  439. count := 0
  440. for _, task := range mq.tasks {
  441. if task.Type == taskType && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) {
  442. count++
  443. }
  444. }
  445. return count
  446. }
  447. // WasTaskRecentlyCompleted checks if a similar task was recently completed
  448. func (mq *MaintenanceQueue) WasTaskRecentlyCompleted(taskType MaintenanceTaskType, volumeID uint32, server string, now time.Time) bool {
  449. mq.mutex.RLock()
  450. defer mq.mutex.RUnlock()
  451. // Get the repeat prevention interval for this task type
  452. interval := mq.getRepeatPreventionInterval(taskType)
  453. cutoff := now.Add(-interval)
  454. for _, task := range mq.tasks {
  455. if task.Type == taskType &&
  456. task.VolumeID == volumeID &&
  457. task.Server == server &&
  458. task.Status == TaskStatusCompleted &&
  459. task.CompletedAt != nil &&
  460. task.CompletedAt.After(cutoff) {
  461. return true
  462. }
  463. }
  464. return false
  465. }
  466. // getRepeatPreventionInterval returns the interval for preventing task repetition
  467. func (mq *MaintenanceQueue) getRepeatPreventionInterval(taskType MaintenanceTaskType) time.Duration {
  468. // First try to get default from task scheduler
  469. if mq.integration != nil {
  470. if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil {
  471. defaultInterval := scheduler.GetDefaultRepeatInterval()
  472. if defaultInterval > 0 {
  473. glog.V(3).Infof("Using task scheduler default repeat interval for %s: %v", taskType, defaultInterval)
  474. return defaultInterval
  475. }
  476. }
  477. }
  478. // Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
  479. if mq.policy != nil {
  480. repeatIntervalHours := GetRepeatInterval(mq.policy, taskType)
  481. if repeatIntervalHours > 0 {
  482. interval := time.Duration(repeatIntervalHours) * time.Hour
  483. glog.V(3).Infof("Using policy configuration repeat interval for %s: %v", taskType, interval)
  484. return interval
  485. }
  486. }
  487. // Ultimate fallback - but avoid hardcoded values where possible
  488. glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1h", taskType)
  489. return time.Hour // Minimal safe default
  490. }
  491. // GetTasks returns tasks with optional filtering
  492. func (mq *MaintenanceQueue) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
  493. mq.mutex.RLock()
  494. defer mq.mutex.RUnlock()
  495. var tasks []*MaintenanceTask
  496. for _, task := range mq.tasks {
  497. if status != "" && task.Status != status {
  498. continue
  499. }
  500. if taskType != "" && task.Type != taskType {
  501. continue
  502. }
  503. tasks = append(tasks, task)
  504. if limit > 0 && len(tasks) >= limit {
  505. break
  506. }
  507. }
  508. // Sort by creation time (newest first)
  509. sort.Slice(tasks, func(i, j int) bool {
  510. return tasks[i].CreatedAt.After(tasks[j].CreatedAt)
  511. })
  512. return tasks
  513. }
  514. // GetWorkers returns all registered workers
  515. func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker {
  516. mq.mutex.RLock()
  517. defer mq.mutex.RUnlock()
  518. var workers []*MaintenanceWorker
  519. for _, worker := range mq.workers {
  520. workers = append(workers, worker)
  521. }
  522. return workers
  523. }
  524. // generateTaskID generates a unique ID for tasks
  525. func generateTaskID() string {
  526. const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
  527. b := make([]byte, 8)
  528. randBytes := make([]byte, 8)
  529. // Generate random bytes
  530. if _, err := rand.Read(randBytes); err != nil {
  531. // Fallback to timestamp-based ID if crypto/rand fails
  532. timestamp := time.Now().UnixNano()
  533. return fmt.Sprintf("task-%d", timestamp)
  534. }
  535. // Convert random bytes to charset
  536. for i := range b {
  537. b[i] = charset[int(randBytes[i])%len(charset)]
  538. }
  539. // Add timestamp suffix to ensure uniqueness
  540. timestamp := time.Now().Unix() % 10000 // last 4 digits of timestamp
  541. return fmt.Sprintf("%s-%04d", string(b), timestamp)
  542. }
  543. // CleanupOldTasks removes old completed and failed tasks
  544. func (mq *MaintenanceQueue) CleanupOldTasks(retention time.Duration) int {
  545. mq.mutex.Lock()
  546. defer mq.mutex.Unlock()
  547. cutoff := time.Now().Add(-retention)
  548. removed := 0
  549. for id, task := range mq.tasks {
  550. if (task.Status == TaskStatusCompleted || task.Status == TaskStatusFailed) &&
  551. task.CompletedAt != nil &&
  552. task.CompletedAt.Before(cutoff) {
  553. delete(mq.tasks, id)
  554. removed++
  555. }
  556. }
  557. glog.V(2).Infof("Cleaned up %d old maintenance tasks", removed)
  558. return removed
  559. }
  560. // RemoveStaleWorkers removes workers that haven't sent heartbeat recently
  561. func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int {
  562. mq.mutex.Lock()
  563. defer mq.mutex.Unlock()
  564. cutoff := time.Now().Add(-timeout)
  565. removed := 0
  566. for id, worker := range mq.workers {
  567. if worker.LastHeartbeat.Before(cutoff) {
  568. // Mark any assigned tasks as failed and record unassignment
  569. for _, task := range mq.tasks {
  570. if task.WorkerID == id && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) {
  571. // Record unassignment due to worker becoming unavailable
  572. if len(task.AssignmentHistory) > 0 {
  573. lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
  574. if lastAssignment.UnassignedAt == nil {
  575. unassignedTime := time.Now()
  576. lastAssignment.UnassignedAt = &unassignedTime
  577. lastAssignment.Reason = "Worker became unavailable (stale heartbeat)"
  578. }
  579. }
  580. task.Status = TaskStatusFailed
  581. task.Error = "Worker became unavailable"
  582. completedTime := time.Now()
  583. task.CompletedAt = &completedTime
  584. }
  585. }
  586. delete(mq.workers, id)
  587. removed++
  588. glog.Warningf("Removed stale maintenance worker %s", id)
  589. }
  590. }
  591. return removed
  592. }
  593. // GetStats returns maintenance statistics
  594. func (mq *MaintenanceQueue) GetStats() *MaintenanceStats {
  595. mq.mutex.RLock()
  596. defer mq.mutex.RUnlock()
  597. stats := &MaintenanceStats{
  598. TotalTasks: len(mq.tasks),
  599. TasksByStatus: make(map[MaintenanceTaskStatus]int),
  600. TasksByType: make(map[MaintenanceTaskType]int),
  601. ActiveWorkers: 0,
  602. }
  603. today := time.Now().Truncate(24 * time.Hour)
  604. var totalDuration time.Duration
  605. var completedTasks int
  606. for _, task := range mq.tasks {
  607. stats.TasksByStatus[task.Status]++
  608. stats.TasksByType[task.Type]++
  609. if task.CompletedAt != nil && task.CompletedAt.After(today) {
  610. if task.Status == TaskStatusCompleted {
  611. stats.CompletedToday++
  612. } else if task.Status == TaskStatusFailed {
  613. stats.FailedToday++
  614. }
  615. if task.StartedAt != nil {
  616. duration := task.CompletedAt.Sub(*task.StartedAt)
  617. totalDuration += duration
  618. completedTasks++
  619. }
  620. }
  621. }
  622. for _, worker := range mq.workers {
  623. if worker.Status == "active" || worker.Status == "busy" {
  624. stats.ActiveWorkers++
  625. }
  626. }
  627. if completedTasks > 0 {
  628. stats.AverageTaskTime = totalDuration / time.Duration(completedTasks)
  629. }
  630. return stats
  631. }
  632. // workerCanHandle checks if a worker can handle a specific task type
  633. func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabilities []MaintenanceTaskType) bool {
  634. for _, capability := range capabilities {
  635. if capability == taskType {
  636. return true
  637. }
  638. }
  639. return false
  640. }
  641. // canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic
  642. func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool {
  643. glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type)
  644. // TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive
  645. // Use fallback logic directly for now
  646. glog.V(2).Infof("Using fallback logic for task scheduling")
  647. canExecute := mq.canExecuteTaskType(task.Type)
  648. glog.V(2).Infof("Fallback decision for task %s: %v", task.ID, canExecute)
  649. return canExecute
  650. // NOTE: Original integration code disabled temporarily
  651. // Try task scheduling logic first
  652. /*
  653. if mq.integration != nil {
  654. glog.Infof("DEBUG canScheduleTaskNow: Using integration task scheduler")
  655. // Get all running tasks and available workers
  656. runningTasks := mq.getRunningTasks()
  657. availableWorkers := mq.getAvailableWorkers()
  658. glog.Infof("DEBUG canScheduleTaskNow: Running tasks: %d, Available workers: %d", len(runningTasks), len(availableWorkers))
  659. canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
  660. glog.Infof("DEBUG canScheduleTaskNow: Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
  661. return canSchedule
  662. }
  663. */
  664. }
  665. // canExecuteTaskType checks if we can execute more tasks of this type (concurrency limits) - fallback logic
  666. func (mq *MaintenanceQueue) canExecuteTaskType(taskType MaintenanceTaskType) bool {
  667. runningCount := mq.GetRunningTaskCount(taskType)
  668. maxConcurrent := mq.getMaxConcurrentForTaskType(taskType)
  669. canExecute := runningCount < maxConcurrent
  670. glog.V(3).Infof("canExecuteTaskType for %s: running=%d, max=%d, canExecute=%v", taskType, runningCount, maxConcurrent, canExecute)
  671. return canExecute
  672. }
  673. // getMaxConcurrentForTaskType returns the maximum concurrent tasks allowed for a task type
  674. func (mq *MaintenanceQueue) getMaxConcurrentForTaskType(taskType MaintenanceTaskType) int {
  675. // First try to get default from task scheduler
  676. if mq.integration != nil {
  677. if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil {
  678. maxConcurrent := scheduler.GetMaxConcurrent()
  679. if maxConcurrent > 0 {
  680. glog.V(3).Infof("Using task scheduler max concurrent for %s: %d", taskType, maxConcurrent)
  681. return maxConcurrent
  682. }
  683. }
  684. }
  685. // Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
  686. if mq.policy != nil {
  687. maxConcurrent := GetMaxConcurrent(mq.policy, taskType)
  688. if maxConcurrent > 0 {
  689. glog.V(3).Infof("Using policy configuration max concurrent for %s: %d", taskType, maxConcurrent)
  690. return maxConcurrent
  691. }
  692. }
  693. // Ultimate fallback - minimal safe default
  694. glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1", taskType)
  695. return 1
  696. }
  697. // getRunningTasks returns all currently running tasks
  698. func (mq *MaintenanceQueue) getRunningTasks() []*MaintenanceTask {
  699. var runningTasks []*MaintenanceTask
  700. for _, task := range mq.tasks {
  701. if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress {
  702. runningTasks = append(runningTasks, task)
  703. }
  704. }
  705. return runningTasks
  706. }
  707. // getAvailableWorkers returns all workers that can take more work
  708. func (mq *MaintenanceQueue) getAvailableWorkers() []*MaintenanceWorker {
  709. var availableWorkers []*MaintenanceWorker
  710. for _, worker := range mq.workers {
  711. if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent {
  712. availableWorkers = append(availableWorkers, worker)
  713. }
  714. }
  715. return availableWorkers
  716. }
  717. // trackPendingOperation adds a task to the pending operations tracker
  718. func (mq *MaintenanceQueue) trackPendingOperation(task *MaintenanceTask) {
  719. if mq.integration == nil {
  720. return
  721. }
  722. pendingOps := mq.integration.GetPendingOperations()
  723. if pendingOps == nil {
  724. return
  725. }
  726. // Skip tracking for tasks without proper typed parameters
  727. if task.TypedParams == nil {
  728. glog.V(2).Infof("Skipping pending operation tracking for task %s - no typed parameters", task.ID)
  729. return
  730. }
  731. // Map maintenance task type to pending operation type
  732. var opType PendingOperationType
  733. switch task.Type {
  734. case MaintenanceTaskType("balance"):
  735. opType = OpTypeVolumeBalance
  736. case MaintenanceTaskType("erasure_coding"):
  737. opType = OpTypeErasureCoding
  738. case MaintenanceTaskType("vacuum"):
  739. opType = OpTypeVacuum
  740. case MaintenanceTaskType("replication"):
  741. opType = OpTypeReplication
  742. default:
  743. opType = OpTypeVolumeMove
  744. }
  745. // Determine destination node and estimated size from unified targets
  746. destNode := ""
  747. estimatedSize := uint64(1024 * 1024 * 1024) // Default 1GB estimate
  748. // Use unified targets array - the only source of truth
  749. if len(task.TypedParams.Targets) > 0 {
  750. destNode = task.TypedParams.Targets[0].Node
  751. if task.TypedParams.Targets[0].EstimatedSize > 0 {
  752. estimatedSize = task.TypedParams.Targets[0].EstimatedSize
  753. }
  754. }
  755. // Determine source node from unified sources
  756. sourceNode := ""
  757. if len(task.TypedParams.Sources) > 0 {
  758. sourceNode = task.TypedParams.Sources[0].Node
  759. }
  760. operation := &PendingOperation{
  761. VolumeID: task.VolumeID,
  762. OperationType: opType,
  763. SourceNode: sourceNode,
  764. DestNode: destNode,
  765. TaskID: task.ID,
  766. StartTime: time.Now(),
  767. EstimatedSize: estimatedSize,
  768. Collection: task.Collection,
  769. Status: "assigned",
  770. }
  771. pendingOps.AddOperation(operation)
  772. }
  773. // removePendingOperation removes a task from the pending operations tracker
  774. func (mq *MaintenanceQueue) removePendingOperation(taskID string) {
  775. if mq.integration == nil {
  776. return
  777. }
  778. pendingOps := mq.integration.GetPendingOperations()
  779. if pendingOps == nil {
  780. return
  781. }
  782. pendingOps.RemoveOperation(taskID)
  783. }
  784. // updatePendingOperationStatus updates the status of a pending operation
  785. func (mq *MaintenanceQueue) updatePendingOperationStatus(taskID string, status string) {
  786. if mq.integration == nil {
  787. return
  788. }
  789. pendingOps := mq.integration.GetPendingOperations()
  790. if pendingOps == nil {
  791. return
  792. }
  793. pendingOps.UpdateOperationStatus(taskID, status)
  794. }