| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568 |
- package maintenance
- import (
- "fmt"
- "strings"
- "sync"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
- "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
- )
- // buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy
- func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
- policy := &worker_pb.MaintenancePolicy{
- GlobalMaxConcurrent: 4,
- DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
- DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
- TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
- }
- // Load vacuum task configuration
- if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil {
- policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{
- Enabled: vacuumConfig.Enabled,
- MaxConcurrent: int32(vacuumConfig.MaxConcurrent),
- RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
- CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
- TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
- VacuumConfig: &worker_pb.VacuumTaskConfig{
- GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
- MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
- MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
- },
- },
- }
- }
- // Load erasure coding task configuration
- if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil {
- policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{
- Enabled: ecConfig.Enabled,
- MaxConcurrent: int32(ecConfig.MaxConcurrent),
- RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
- CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
- TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
- ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
- FullnessRatio: float64(ecConfig.FullnessRatio),
- QuietForSeconds: int32(ecConfig.QuietForSeconds),
- MinVolumeSizeMb: int32(ecConfig.MinSizeMB),
- CollectionFilter: ecConfig.CollectionFilter,
- },
- },
- }
- }
- // Load balance task configuration
- if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil {
- policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{
- Enabled: balanceConfig.Enabled,
- MaxConcurrent: int32(balanceConfig.MaxConcurrent),
- RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
- CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
- TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
- BalanceConfig: &worker_pb.BalanceTaskConfig{
- ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold),
- MinServerCount: int32(balanceConfig.MinServerCount),
- },
- },
- }
- }
- glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
- return policy
- }
- // MaintenanceManager coordinates the maintenance system
- type MaintenanceManager struct {
- config *MaintenanceConfig
- scanner *MaintenanceScanner
- queue *MaintenanceQueue
- adminClient AdminClient
- running bool
- stopChan chan struct{}
- // Error handling and backoff
- errorCount int
- lastError error
- lastErrorTime time.Time
- backoffDelay time.Duration
- mutex sync.RWMutex
- scanInProgress bool
- }
- // NewMaintenanceManager creates a new maintenance manager
- func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) *MaintenanceManager {
- if config == nil {
- config = DefaultMaintenanceConfig()
- }
- // Use the policy from the config (which is populated from separate task files in LoadMaintenanceConfig)
- policy := config.Policy
- if policy == nil {
- // Fallback: build policy from separate task configuration files if not already populated
- policy = buildPolicyFromTaskConfigs()
- }
- queue := NewMaintenanceQueue(policy)
- scanner := NewMaintenanceScanner(adminClient, policy, queue)
- return &MaintenanceManager{
- config: config,
- scanner: scanner,
- queue: queue,
- adminClient: adminClient,
- stopChan: make(chan struct{}),
- backoffDelay: time.Second, // Start with 1 second backoff
- }
- }
- // Start begins the maintenance manager
- func (mm *MaintenanceManager) Start() error {
- if !mm.config.Enabled {
- glog.V(1).Infof("Maintenance system is disabled")
- return nil
- }
- // Validate configuration durations to prevent ticker panics
- if err := mm.validateConfig(); err != nil {
- return fmt.Errorf("invalid maintenance configuration: %w", err)
- }
- mm.running = true
- // Start background processes
- go mm.scanLoop()
- go mm.cleanupLoop()
- glog.Infof("Maintenance manager started with scan interval %ds", mm.config.ScanIntervalSeconds)
- return nil
- }
- // validateConfig validates the maintenance configuration durations
- func (mm *MaintenanceManager) validateConfig() error {
- if mm.config.ScanIntervalSeconds <= 0 {
- glog.Warningf("Invalid scan interval %ds, using default 30m", mm.config.ScanIntervalSeconds)
- mm.config.ScanIntervalSeconds = 30 * 60 // 30 minutes in seconds
- }
- if mm.config.CleanupIntervalSeconds <= 0 {
- glog.Warningf("Invalid cleanup interval %ds, using default 24h", mm.config.CleanupIntervalSeconds)
- mm.config.CleanupIntervalSeconds = 24 * 60 * 60 // 24 hours in seconds
- }
- if mm.config.WorkerTimeoutSeconds <= 0 {
- glog.Warningf("Invalid worker timeout %ds, using default 5m", mm.config.WorkerTimeoutSeconds)
- mm.config.WorkerTimeoutSeconds = 5 * 60 // 5 minutes in seconds
- }
- if mm.config.TaskTimeoutSeconds <= 0 {
- glog.Warningf("Invalid task timeout %ds, using default 2h", mm.config.TaskTimeoutSeconds)
- mm.config.TaskTimeoutSeconds = 2 * 60 * 60 // 2 hours in seconds
- }
- if mm.config.RetryDelaySeconds <= 0 {
- glog.Warningf("Invalid retry delay %ds, using default 15m", mm.config.RetryDelaySeconds)
- mm.config.RetryDelaySeconds = 15 * 60 // 15 minutes in seconds
- }
- if mm.config.TaskRetentionSeconds <= 0 {
- glog.Warningf("Invalid task retention %ds, using default 168h", mm.config.TaskRetentionSeconds)
- mm.config.TaskRetentionSeconds = 7 * 24 * 60 * 60 // 7 days in seconds
- }
- return nil
- }
- // IsRunning returns whether the maintenance manager is currently running
- func (mm *MaintenanceManager) IsRunning() bool {
- return mm.running
- }
- // Stop terminates the maintenance manager
- func (mm *MaintenanceManager) Stop() {
- mm.running = false
- close(mm.stopChan)
- glog.Infof("Maintenance manager stopped")
- }
- // scanLoop periodically scans for maintenance tasks with adaptive timing
- func (mm *MaintenanceManager) scanLoop() {
- scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
- ticker := time.NewTicker(scanInterval)
- defer ticker.Stop()
- for mm.running {
- select {
- case <-mm.stopChan:
- return
- case <-ticker.C:
- glog.V(1).Infof("Performing maintenance scan every %v", scanInterval)
- // Use the same synchronization as TriggerScan to prevent concurrent scans
- if err := mm.triggerScanInternal(false); err != nil {
- glog.V(1).Infof("Scheduled scan skipped: %v", err)
- }
- // Adjust ticker interval based on error state (read error state safely)
- currentInterval := mm.getScanInterval(scanInterval)
- // Reset ticker with new interval if needed
- if currentInterval != scanInterval {
- ticker.Stop()
- ticker = time.NewTicker(currentInterval)
- }
- }
- }
- }
- // getScanInterval safely reads the current scan interval with error backoff
- func (mm *MaintenanceManager) getScanInterval(baseInterval time.Duration) time.Duration {
- mm.mutex.RLock()
- defer mm.mutex.RUnlock()
- if mm.errorCount > 0 {
- // Use backoff delay when there are errors
- currentInterval := mm.backoffDelay
- if currentInterval > baseInterval {
- // Don't make it longer than the configured interval * 10
- maxInterval := baseInterval * 10
- if currentInterval > maxInterval {
- currentInterval = maxInterval
- }
- }
- return currentInterval
- }
- return baseInterval
- }
- // cleanupLoop periodically cleans up old tasks and stale workers
- func (mm *MaintenanceManager) cleanupLoop() {
- cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second
- ticker := time.NewTicker(cleanupInterval)
- defer ticker.Stop()
- for mm.running {
- select {
- case <-mm.stopChan:
- return
- case <-ticker.C:
- mm.performCleanup()
- }
- }
- }
- // performScan executes a maintenance scan with error handling and backoff
- func (mm *MaintenanceManager) performScan() {
- defer func() {
- // Always reset scan in progress flag when done
- mm.mutex.Lock()
- mm.scanInProgress = false
- mm.mutex.Unlock()
- }()
- glog.Infof("Starting maintenance scan...")
- results, err := mm.scanner.ScanForMaintenanceTasks()
- if err != nil {
- // Handle scan error
- mm.mutex.Lock()
- mm.handleScanError(err)
- mm.mutex.Unlock()
- glog.Warningf("Maintenance scan failed: %v", err)
- return
- }
- // Scan succeeded - update state and process results
- mm.handleScanSuccess(results)
- }
- // handleScanSuccess processes successful scan results with proper lock management
- func (mm *MaintenanceManager) handleScanSuccess(results []*TaskDetectionResult) {
- // Update manager state first
- mm.mutex.Lock()
- mm.resetErrorTracking()
- taskCount := len(results)
- mm.mutex.Unlock()
- if taskCount > 0 {
- // Count tasks by type for logging (outside of lock)
- taskCounts := make(map[MaintenanceTaskType]int)
- for _, result := range results {
- taskCounts[result.TaskType]++
- }
- // Add tasks to queue (no manager lock held)
- mm.queue.AddTasksFromResults(results)
- // Log detailed scan results
- glog.Infof("Maintenance scan completed: found %d tasks", taskCount)
- for taskType, count := range taskCounts {
- glog.Infof(" - %s: %d tasks", taskType, count)
- }
- } else {
- glog.Infof("Maintenance scan completed: no maintenance tasks needed")
- }
- }
- // handleScanError handles scan errors with exponential backoff and reduced logging
- func (mm *MaintenanceManager) handleScanError(err error) {
- now := time.Now()
- mm.errorCount++
- mm.lastError = err
- mm.lastErrorTime = now
- // Use exponential backoff with jitter
- if mm.errorCount > 1 {
- mm.backoffDelay = mm.backoffDelay * 2
- if mm.backoffDelay > 5*time.Minute {
- mm.backoffDelay = 5 * time.Minute // Cap at 5 minutes
- }
- }
- // Reduce log frequency based on error count and time
- shouldLog := false
- if mm.errorCount <= 3 {
- // Log first 3 errors immediately
- shouldLog = true
- } else if mm.errorCount <= 10 && mm.errorCount%3 == 0 {
- // Log every 3rd error for errors 4-10
- shouldLog = true
- } else if mm.errorCount%10 == 0 {
- // Log every 10th error after that
- shouldLog = true
- }
- if shouldLog {
- // Check if it's a connection error to provide better messaging
- if isConnectionError(err) {
- if mm.errorCount == 1 {
- glog.Errorf("Maintenance scan failed: %v (will retry with backoff)", err)
- } else {
- glog.Errorf("Maintenance scan still failing after %d attempts: %v (backoff: %v)",
- mm.errorCount, err, mm.backoffDelay)
- }
- } else {
- glog.Errorf("Maintenance scan failed: %v", err)
- }
- } else {
- // Use debug level for suppressed errors
- glog.V(3).Infof("Maintenance scan failed (error #%d, suppressed): %v", mm.errorCount, err)
- }
- }
- // resetErrorTracking resets error tracking when scan succeeds
- func (mm *MaintenanceManager) resetErrorTracking() {
- if mm.errorCount > 0 {
- glog.V(1).Infof("Maintenance scan recovered after %d failed attempts", mm.errorCount)
- mm.errorCount = 0
- mm.lastError = nil
- mm.backoffDelay = time.Second // Reset to initial delay
- }
- }
- // isConnectionError checks if the error is a connection-related error
- func isConnectionError(err error) bool {
- if err == nil {
- return false
- }
- errStr := err.Error()
- return strings.Contains(errStr, "connection refused") ||
- strings.Contains(errStr, "connection error") ||
- strings.Contains(errStr, "dial tcp") ||
- strings.Contains(errStr, "connection timeout") ||
- strings.Contains(errStr, "no route to host") ||
- strings.Contains(errStr, "network unreachable")
- }
- // performCleanup cleans up old tasks and stale workers
- func (mm *MaintenanceManager) performCleanup() {
- glog.V(2).Infof("Starting maintenance cleanup")
- taskRetention := time.Duration(mm.config.TaskRetentionSeconds) * time.Second
- workerTimeout := time.Duration(mm.config.WorkerTimeoutSeconds) * time.Second
- removedTasks := mm.queue.CleanupOldTasks(taskRetention)
- removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout)
- // Clean up stale pending operations (operations running for more than 4 hours)
- staleOperationTimeout := 4 * time.Hour
- removedOperations := 0
- if mm.scanner != nil && mm.scanner.integration != nil {
- pendingOps := mm.scanner.integration.GetPendingOperations()
- if pendingOps != nil {
- removedOperations = pendingOps.CleanupStaleOperations(staleOperationTimeout)
- }
- }
- if removedTasks > 0 || removedWorkers > 0 || removedOperations > 0 {
- glog.V(1).Infof("Cleanup completed: removed %d old tasks, %d stale workers, and %d stale operations",
- removedTasks, removedWorkers, removedOperations)
- }
- }
- // GetQueue returns the maintenance queue
- func (mm *MaintenanceManager) GetQueue() *MaintenanceQueue {
- return mm.queue
- }
- // GetConfig returns the maintenance configuration
- func (mm *MaintenanceManager) GetConfig() *MaintenanceConfig {
- return mm.config
- }
- // GetStats returns maintenance statistics
- func (mm *MaintenanceManager) GetStats() *MaintenanceStats {
- stats := mm.queue.GetStats()
- mm.mutex.RLock()
- defer mm.mutex.RUnlock()
- stats.LastScanTime = time.Now() // Would need to track this properly
- // Calculate next scan time based on current error state
- scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
- nextScanInterval := scanInterval
- if mm.errorCount > 0 {
- nextScanInterval = mm.backoffDelay
- maxInterval := scanInterval * 10
- if nextScanInterval > maxInterval {
- nextScanInterval = maxInterval
- }
- }
- stats.NextScanTime = time.Now().Add(nextScanInterval)
- return stats
- }
- // ReloadTaskConfigurations reloads task configurations from the current policy
- func (mm *MaintenanceManager) ReloadTaskConfigurations() error {
- mm.mutex.Lock()
- defer mm.mutex.Unlock()
- // Trigger configuration reload in the integration layer
- if mm.scanner != nil && mm.scanner.integration != nil {
- mm.scanner.integration.ConfigureTasksFromPolicy()
- glog.V(1).Infof("Task configurations reloaded from policy")
- return nil
- }
- return fmt.Errorf("integration not available for configuration reload")
- }
- // GetErrorState returns the current error state for monitoring
- func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) {
- mm.mutex.RLock()
- defer mm.mutex.RUnlock()
- return mm.errorCount, mm.lastError, mm.backoffDelay
- }
- // GetTasks returns tasks with filtering
- func (mm *MaintenanceManager) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
- return mm.queue.GetTasks(status, taskType, limit)
- }
- // GetWorkers returns all registered workers
- func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker {
- return mm.queue.GetWorkers()
- }
- // TriggerScan manually triggers a maintenance scan
- func (mm *MaintenanceManager) TriggerScan() error {
- return mm.triggerScanInternal(true)
- }
- // triggerScanInternal handles both manual and automatic scan triggers
- func (mm *MaintenanceManager) triggerScanInternal(isManual bool) error {
- if !mm.running {
- return fmt.Errorf("maintenance manager is not running")
- }
- // Prevent multiple concurrent scans
- mm.mutex.Lock()
- if mm.scanInProgress {
- mm.mutex.Unlock()
- if isManual {
- glog.V(1).Infof("Manual scan already in progress, ignoring trigger request")
- } else {
- glog.V(2).Infof("Automatic scan already in progress, ignoring scheduled scan")
- }
- return fmt.Errorf("scan already in progress")
- }
- mm.scanInProgress = true
- mm.mutex.Unlock()
- go mm.performScan()
- return nil
- }
- // UpdateConfig updates the maintenance configuration
- func (mm *MaintenanceManager) UpdateConfig(config *MaintenanceConfig) error {
- if config == nil {
- return fmt.Errorf("config cannot be nil")
- }
- mm.config = config
- mm.queue.policy = config.Policy
- mm.scanner.policy = config.Policy
- glog.V(1).Infof("Maintenance configuration updated")
- return nil
- }
- // CancelTask cancels a pending task
- func (mm *MaintenanceManager) CancelTask(taskID string) error {
- mm.queue.mutex.Lock()
- defer mm.queue.mutex.Unlock()
- task, exists := mm.queue.tasks[taskID]
- if !exists {
- return fmt.Errorf("task %s not found", taskID)
- }
- if task.Status == TaskStatusPending {
- task.Status = TaskStatusCancelled
- task.CompletedAt = &[]time.Time{time.Now()}[0]
- // Remove from pending tasks
- for i, pendingTask := range mm.queue.pendingTasks {
- if pendingTask.ID == taskID {
- mm.queue.pendingTasks = append(mm.queue.pendingTasks[:i], mm.queue.pendingTasks[i+1:]...)
- break
- }
- }
- glog.V(2).Infof("Cancelled task %s", taskID)
- return nil
- }
- return fmt.Errorf("task %s cannot be cancelled (status: %s)", taskID, task.Status)
- }
- // RegisterWorker registers a new worker
- func (mm *MaintenanceManager) RegisterWorker(worker *MaintenanceWorker) {
- mm.queue.RegisterWorker(worker)
- }
- // GetNextTask returns the next task for a worker
- func (mm *MaintenanceManager) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
- return mm.queue.GetNextTask(workerID, capabilities)
- }
- // CompleteTask marks a task as completed
- func (mm *MaintenanceManager) CompleteTask(taskID string, error string) {
- mm.queue.CompleteTask(taskID, error)
- }
- // UpdateTaskProgress updates task progress
- func (mm *MaintenanceManager) UpdateTaskProgress(taskID string, progress float64) {
- mm.queue.UpdateTaskProgress(taskID, progress)
- }
- // UpdateWorkerHeartbeat updates worker heartbeat
- func (mm *MaintenanceManager) UpdateWorkerHeartbeat(workerID string) {
- mm.queue.UpdateWorkerHeartbeat(workerID)
- }
|