maintenance_manager.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. package maintenance
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
  10. "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
  11. "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
  12. )
  13. // buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy
  14. func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
  15. policy := &worker_pb.MaintenancePolicy{
  16. GlobalMaxConcurrent: 4,
  17. DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
  18. DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
  19. TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
  20. }
  21. // Load vacuum task configuration
  22. if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil {
  23. policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{
  24. Enabled: vacuumConfig.Enabled,
  25. MaxConcurrent: int32(vacuumConfig.MaxConcurrent),
  26. RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
  27. CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
  28. TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
  29. VacuumConfig: &worker_pb.VacuumTaskConfig{
  30. GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
  31. MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
  32. MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
  33. },
  34. },
  35. }
  36. }
  37. // Load erasure coding task configuration
  38. if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil {
  39. policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{
  40. Enabled: ecConfig.Enabled,
  41. MaxConcurrent: int32(ecConfig.MaxConcurrent),
  42. RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
  43. CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
  44. TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
  45. ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
  46. FullnessRatio: float64(ecConfig.FullnessRatio),
  47. QuietForSeconds: int32(ecConfig.QuietForSeconds),
  48. MinVolumeSizeMb: int32(ecConfig.MinSizeMB),
  49. CollectionFilter: ecConfig.CollectionFilter,
  50. },
  51. },
  52. }
  53. }
  54. // Load balance task configuration
  55. if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil {
  56. policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{
  57. Enabled: balanceConfig.Enabled,
  58. MaxConcurrent: int32(balanceConfig.MaxConcurrent),
  59. RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
  60. CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
  61. TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
  62. BalanceConfig: &worker_pb.BalanceTaskConfig{
  63. ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold),
  64. MinServerCount: int32(balanceConfig.MinServerCount),
  65. },
  66. },
  67. }
  68. }
  69. glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
  70. return policy
  71. }
  72. // MaintenanceManager coordinates the maintenance system
  73. type MaintenanceManager struct {
  74. config *MaintenanceConfig
  75. scanner *MaintenanceScanner
  76. queue *MaintenanceQueue
  77. adminClient AdminClient
  78. running bool
  79. stopChan chan struct{}
  80. // Error handling and backoff
  81. errorCount int
  82. lastError error
  83. lastErrorTime time.Time
  84. backoffDelay time.Duration
  85. mutex sync.RWMutex
  86. scanInProgress bool
  87. }
  88. // NewMaintenanceManager creates a new maintenance manager
  89. func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) *MaintenanceManager {
  90. if config == nil {
  91. config = DefaultMaintenanceConfig()
  92. }
  93. // Use the policy from the config (which is populated from separate task files in LoadMaintenanceConfig)
  94. policy := config.Policy
  95. if policy == nil {
  96. // Fallback: build policy from separate task configuration files if not already populated
  97. policy = buildPolicyFromTaskConfigs()
  98. }
  99. queue := NewMaintenanceQueue(policy)
  100. scanner := NewMaintenanceScanner(adminClient, policy, queue)
  101. return &MaintenanceManager{
  102. config: config,
  103. scanner: scanner,
  104. queue: queue,
  105. adminClient: adminClient,
  106. stopChan: make(chan struct{}),
  107. backoffDelay: time.Second, // Start with 1 second backoff
  108. }
  109. }
  110. // Start begins the maintenance manager
  111. func (mm *MaintenanceManager) Start() error {
  112. if !mm.config.Enabled {
  113. glog.V(1).Infof("Maintenance system is disabled")
  114. return nil
  115. }
  116. // Validate configuration durations to prevent ticker panics
  117. if err := mm.validateConfig(); err != nil {
  118. return fmt.Errorf("invalid maintenance configuration: %w", err)
  119. }
  120. mm.running = true
  121. // Start background processes
  122. go mm.scanLoop()
  123. go mm.cleanupLoop()
  124. glog.Infof("Maintenance manager started with scan interval %ds", mm.config.ScanIntervalSeconds)
  125. return nil
  126. }
  127. // validateConfig validates the maintenance configuration durations
  128. func (mm *MaintenanceManager) validateConfig() error {
  129. if mm.config.ScanIntervalSeconds <= 0 {
  130. glog.Warningf("Invalid scan interval %ds, using default 30m", mm.config.ScanIntervalSeconds)
  131. mm.config.ScanIntervalSeconds = 30 * 60 // 30 minutes in seconds
  132. }
  133. if mm.config.CleanupIntervalSeconds <= 0 {
  134. glog.Warningf("Invalid cleanup interval %ds, using default 24h", mm.config.CleanupIntervalSeconds)
  135. mm.config.CleanupIntervalSeconds = 24 * 60 * 60 // 24 hours in seconds
  136. }
  137. if mm.config.WorkerTimeoutSeconds <= 0 {
  138. glog.Warningf("Invalid worker timeout %ds, using default 5m", mm.config.WorkerTimeoutSeconds)
  139. mm.config.WorkerTimeoutSeconds = 5 * 60 // 5 minutes in seconds
  140. }
  141. if mm.config.TaskTimeoutSeconds <= 0 {
  142. glog.Warningf("Invalid task timeout %ds, using default 2h", mm.config.TaskTimeoutSeconds)
  143. mm.config.TaskTimeoutSeconds = 2 * 60 * 60 // 2 hours in seconds
  144. }
  145. if mm.config.RetryDelaySeconds <= 0 {
  146. glog.Warningf("Invalid retry delay %ds, using default 15m", mm.config.RetryDelaySeconds)
  147. mm.config.RetryDelaySeconds = 15 * 60 // 15 minutes in seconds
  148. }
  149. if mm.config.TaskRetentionSeconds <= 0 {
  150. glog.Warningf("Invalid task retention %ds, using default 168h", mm.config.TaskRetentionSeconds)
  151. mm.config.TaskRetentionSeconds = 7 * 24 * 60 * 60 // 7 days in seconds
  152. }
  153. return nil
  154. }
  155. // IsRunning returns whether the maintenance manager is currently running
  156. func (mm *MaintenanceManager) IsRunning() bool {
  157. return mm.running
  158. }
  159. // Stop terminates the maintenance manager
  160. func (mm *MaintenanceManager) Stop() {
  161. mm.running = false
  162. close(mm.stopChan)
  163. glog.Infof("Maintenance manager stopped")
  164. }
  165. // scanLoop periodically scans for maintenance tasks with adaptive timing
  166. func (mm *MaintenanceManager) scanLoop() {
  167. scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
  168. ticker := time.NewTicker(scanInterval)
  169. defer ticker.Stop()
  170. for mm.running {
  171. select {
  172. case <-mm.stopChan:
  173. return
  174. case <-ticker.C:
  175. glog.V(1).Infof("Performing maintenance scan every %v", scanInterval)
  176. // Use the same synchronization as TriggerScan to prevent concurrent scans
  177. if err := mm.triggerScanInternal(false); err != nil {
  178. glog.V(1).Infof("Scheduled scan skipped: %v", err)
  179. }
  180. // Adjust ticker interval based on error state (read error state safely)
  181. currentInterval := mm.getScanInterval(scanInterval)
  182. // Reset ticker with new interval if needed
  183. if currentInterval != scanInterval {
  184. ticker.Stop()
  185. ticker = time.NewTicker(currentInterval)
  186. }
  187. }
  188. }
  189. }
  190. // getScanInterval safely reads the current scan interval with error backoff
  191. func (mm *MaintenanceManager) getScanInterval(baseInterval time.Duration) time.Duration {
  192. mm.mutex.RLock()
  193. defer mm.mutex.RUnlock()
  194. if mm.errorCount > 0 {
  195. // Use backoff delay when there are errors
  196. currentInterval := mm.backoffDelay
  197. if currentInterval > baseInterval {
  198. // Don't make it longer than the configured interval * 10
  199. maxInterval := baseInterval * 10
  200. if currentInterval > maxInterval {
  201. currentInterval = maxInterval
  202. }
  203. }
  204. return currentInterval
  205. }
  206. return baseInterval
  207. }
  208. // cleanupLoop periodically cleans up old tasks and stale workers
  209. func (mm *MaintenanceManager) cleanupLoop() {
  210. cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second
  211. ticker := time.NewTicker(cleanupInterval)
  212. defer ticker.Stop()
  213. for mm.running {
  214. select {
  215. case <-mm.stopChan:
  216. return
  217. case <-ticker.C:
  218. mm.performCleanup()
  219. }
  220. }
  221. }
  222. // performScan executes a maintenance scan with error handling and backoff
  223. func (mm *MaintenanceManager) performScan() {
  224. defer func() {
  225. // Always reset scan in progress flag when done
  226. mm.mutex.Lock()
  227. mm.scanInProgress = false
  228. mm.mutex.Unlock()
  229. }()
  230. glog.Infof("Starting maintenance scan...")
  231. results, err := mm.scanner.ScanForMaintenanceTasks()
  232. if err != nil {
  233. // Handle scan error
  234. mm.mutex.Lock()
  235. mm.handleScanError(err)
  236. mm.mutex.Unlock()
  237. glog.Warningf("Maintenance scan failed: %v", err)
  238. return
  239. }
  240. // Scan succeeded - update state and process results
  241. mm.handleScanSuccess(results)
  242. }
  243. // handleScanSuccess processes successful scan results with proper lock management
  244. func (mm *MaintenanceManager) handleScanSuccess(results []*TaskDetectionResult) {
  245. // Update manager state first
  246. mm.mutex.Lock()
  247. mm.resetErrorTracking()
  248. taskCount := len(results)
  249. mm.mutex.Unlock()
  250. if taskCount > 0 {
  251. // Count tasks by type for logging (outside of lock)
  252. taskCounts := make(map[MaintenanceTaskType]int)
  253. for _, result := range results {
  254. taskCounts[result.TaskType]++
  255. }
  256. // Add tasks to queue (no manager lock held)
  257. mm.queue.AddTasksFromResults(results)
  258. // Log detailed scan results
  259. glog.Infof("Maintenance scan completed: found %d tasks", taskCount)
  260. for taskType, count := range taskCounts {
  261. glog.Infof(" - %s: %d tasks", taskType, count)
  262. }
  263. } else {
  264. glog.Infof("Maintenance scan completed: no maintenance tasks needed")
  265. }
  266. }
  267. // handleScanError handles scan errors with exponential backoff and reduced logging
  268. func (mm *MaintenanceManager) handleScanError(err error) {
  269. now := time.Now()
  270. mm.errorCount++
  271. mm.lastError = err
  272. mm.lastErrorTime = now
  273. // Use exponential backoff with jitter
  274. if mm.errorCount > 1 {
  275. mm.backoffDelay = mm.backoffDelay * 2
  276. if mm.backoffDelay > 5*time.Minute {
  277. mm.backoffDelay = 5 * time.Minute // Cap at 5 minutes
  278. }
  279. }
  280. // Reduce log frequency based on error count and time
  281. shouldLog := false
  282. if mm.errorCount <= 3 {
  283. // Log first 3 errors immediately
  284. shouldLog = true
  285. } else if mm.errorCount <= 10 && mm.errorCount%3 == 0 {
  286. // Log every 3rd error for errors 4-10
  287. shouldLog = true
  288. } else if mm.errorCount%10 == 0 {
  289. // Log every 10th error after that
  290. shouldLog = true
  291. }
  292. if shouldLog {
  293. // Check if it's a connection error to provide better messaging
  294. if isConnectionError(err) {
  295. if mm.errorCount == 1 {
  296. glog.Errorf("Maintenance scan failed: %v (will retry with backoff)", err)
  297. } else {
  298. glog.Errorf("Maintenance scan still failing after %d attempts: %v (backoff: %v)",
  299. mm.errorCount, err, mm.backoffDelay)
  300. }
  301. } else {
  302. glog.Errorf("Maintenance scan failed: %v", err)
  303. }
  304. } else {
  305. // Use debug level for suppressed errors
  306. glog.V(3).Infof("Maintenance scan failed (error #%d, suppressed): %v", mm.errorCount, err)
  307. }
  308. }
  309. // resetErrorTracking resets error tracking when scan succeeds
  310. func (mm *MaintenanceManager) resetErrorTracking() {
  311. if mm.errorCount > 0 {
  312. glog.V(1).Infof("Maintenance scan recovered after %d failed attempts", mm.errorCount)
  313. mm.errorCount = 0
  314. mm.lastError = nil
  315. mm.backoffDelay = time.Second // Reset to initial delay
  316. }
  317. }
  318. // isConnectionError checks if the error is a connection-related error
  319. func isConnectionError(err error) bool {
  320. if err == nil {
  321. return false
  322. }
  323. errStr := err.Error()
  324. return strings.Contains(errStr, "connection refused") ||
  325. strings.Contains(errStr, "connection error") ||
  326. strings.Contains(errStr, "dial tcp") ||
  327. strings.Contains(errStr, "connection timeout") ||
  328. strings.Contains(errStr, "no route to host") ||
  329. strings.Contains(errStr, "network unreachable")
  330. }
  331. // performCleanup cleans up old tasks and stale workers
  332. func (mm *MaintenanceManager) performCleanup() {
  333. glog.V(2).Infof("Starting maintenance cleanup")
  334. taskRetention := time.Duration(mm.config.TaskRetentionSeconds) * time.Second
  335. workerTimeout := time.Duration(mm.config.WorkerTimeoutSeconds) * time.Second
  336. removedTasks := mm.queue.CleanupOldTasks(taskRetention)
  337. removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout)
  338. // Clean up stale pending operations (operations running for more than 4 hours)
  339. staleOperationTimeout := 4 * time.Hour
  340. removedOperations := 0
  341. if mm.scanner != nil && mm.scanner.integration != nil {
  342. pendingOps := mm.scanner.integration.GetPendingOperations()
  343. if pendingOps != nil {
  344. removedOperations = pendingOps.CleanupStaleOperations(staleOperationTimeout)
  345. }
  346. }
  347. if removedTasks > 0 || removedWorkers > 0 || removedOperations > 0 {
  348. glog.V(1).Infof("Cleanup completed: removed %d old tasks, %d stale workers, and %d stale operations",
  349. removedTasks, removedWorkers, removedOperations)
  350. }
  351. }
  352. // GetQueue returns the maintenance queue
  353. func (mm *MaintenanceManager) GetQueue() *MaintenanceQueue {
  354. return mm.queue
  355. }
  356. // GetConfig returns the maintenance configuration
  357. func (mm *MaintenanceManager) GetConfig() *MaintenanceConfig {
  358. return mm.config
  359. }
  360. // GetStats returns maintenance statistics
  361. func (mm *MaintenanceManager) GetStats() *MaintenanceStats {
  362. stats := mm.queue.GetStats()
  363. mm.mutex.RLock()
  364. defer mm.mutex.RUnlock()
  365. stats.LastScanTime = time.Now() // Would need to track this properly
  366. // Calculate next scan time based on current error state
  367. scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
  368. nextScanInterval := scanInterval
  369. if mm.errorCount > 0 {
  370. nextScanInterval = mm.backoffDelay
  371. maxInterval := scanInterval * 10
  372. if nextScanInterval > maxInterval {
  373. nextScanInterval = maxInterval
  374. }
  375. }
  376. stats.NextScanTime = time.Now().Add(nextScanInterval)
  377. return stats
  378. }
  379. // ReloadTaskConfigurations reloads task configurations from the current policy
  380. func (mm *MaintenanceManager) ReloadTaskConfigurations() error {
  381. mm.mutex.Lock()
  382. defer mm.mutex.Unlock()
  383. // Trigger configuration reload in the integration layer
  384. if mm.scanner != nil && mm.scanner.integration != nil {
  385. mm.scanner.integration.ConfigureTasksFromPolicy()
  386. glog.V(1).Infof("Task configurations reloaded from policy")
  387. return nil
  388. }
  389. return fmt.Errorf("integration not available for configuration reload")
  390. }
  391. // GetErrorState returns the current error state for monitoring
  392. func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) {
  393. mm.mutex.RLock()
  394. defer mm.mutex.RUnlock()
  395. return mm.errorCount, mm.lastError, mm.backoffDelay
  396. }
  397. // GetTasks returns tasks with filtering
  398. func (mm *MaintenanceManager) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
  399. return mm.queue.GetTasks(status, taskType, limit)
  400. }
  401. // GetWorkers returns all registered workers
  402. func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker {
  403. return mm.queue.GetWorkers()
  404. }
  405. // TriggerScan manually triggers a maintenance scan
  406. func (mm *MaintenanceManager) TriggerScan() error {
  407. return mm.triggerScanInternal(true)
  408. }
  409. // triggerScanInternal handles both manual and automatic scan triggers
  410. func (mm *MaintenanceManager) triggerScanInternal(isManual bool) error {
  411. if !mm.running {
  412. return fmt.Errorf("maintenance manager is not running")
  413. }
  414. // Prevent multiple concurrent scans
  415. mm.mutex.Lock()
  416. if mm.scanInProgress {
  417. mm.mutex.Unlock()
  418. if isManual {
  419. glog.V(1).Infof("Manual scan already in progress, ignoring trigger request")
  420. } else {
  421. glog.V(2).Infof("Automatic scan already in progress, ignoring scheduled scan")
  422. }
  423. return fmt.Errorf("scan already in progress")
  424. }
  425. mm.scanInProgress = true
  426. mm.mutex.Unlock()
  427. go mm.performScan()
  428. return nil
  429. }
  430. // UpdateConfig updates the maintenance configuration
  431. func (mm *MaintenanceManager) UpdateConfig(config *MaintenanceConfig) error {
  432. if config == nil {
  433. return fmt.Errorf("config cannot be nil")
  434. }
  435. mm.config = config
  436. mm.queue.policy = config.Policy
  437. mm.scanner.policy = config.Policy
  438. glog.V(1).Infof("Maintenance configuration updated")
  439. return nil
  440. }
  441. // CancelTask cancels a pending task
  442. func (mm *MaintenanceManager) CancelTask(taskID string) error {
  443. mm.queue.mutex.Lock()
  444. defer mm.queue.mutex.Unlock()
  445. task, exists := mm.queue.tasks[taskID]
  446. if !exists {
  447. return fmt.Errorf("task %s not found", taskID)
  448. }
  449. if task.Status == TaskStatusPending {
  450. task.Status = TaskStatusCancelled
  451. task.CompletedAt = &[]time.Time{time.Now()}[0]
  452. // Remove from pending tasks
  453. for i, pendingTask := range mm.queue.pendingTasks {
  454. if pendingTask.ID == taskID {
  455. mm.queue.pendingTasks = append(mm.queue.pendingTasks[:i], mm.queue.pendingTasks[i+1:]...)
  456. break
  457. }
  458. }
  459. glog.V(2).Infof("Cancelled task %s", taskID)
  460. return nil
  461. }
  462. return fmt.Errorf("task %s cannot be cancelled (status: %s)", taskID, task.Status)
  463. }
  464. // RegisterWorker registers a new worker
  465. func (mm *MaintenanceManager) RegisterWorker(worker *MaintenanceWorker) {
  466. mm.queue.RegisterWorker(worker)
  467. }
  468. // GetNextTask returns the next task for a worker
  469. func (mm *MaintenanceManager) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
  470. return mm.queue.GetNextTask(workerID, capabilities)
  471. }
  472. // CompleteTask marks a task as completed
  473. func (mm *MaintenanceManager) CompleteTask(taskID string, error string) {
  474. mm.queue.CompleteTask(taskID, error)
  475. }
  476. // UpdateTaskProgress updates task progress
  477. func (mm *MaintenanceManager) UpdateTaskProgress(taskID string, progress float64) {
  478. mm.queue.UpdateTaskProgress(taskID, progress)
  479. }
  480. // UpdateWorkerHeartbeat updates worker heartbeat
  481. func (mm *MaintenanceManager) UpdateWorkerHeartbeat(workerID string) {
  482. mm.queue.UpdateWorkerHeartbeat(workerID)
  483. }