detection.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package vacuum
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
  8. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  9. )
  10. // Detection implements the detection logic for vacuum tasks
  11. func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
  12. if !config.IsEnabled() {
  13. return nil, nil
  14. }
  15. vacuumConfig := config.(*Config)
  16. var results []*types.TaskDetectionResult
  17. minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second
  18. debugCount := 0
  19. skippedDueToGarbage := 0
  20. skippedDueToAge := 0
  21. for _, metric := range metrics {
  22. // Check if volume needs vacuum
  23. if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge {
  24. priority := types.TaskPriorityNormal
  25. if metric.GarbageRatio > 0.6 {
  26. priority = types.TaskPriorityHigh
  27. }
  28. // Generate task ID for future ActiveTopology integration
  29. taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix())
  30. result := &types.TaskDetectionResult{
  31. TaskID: taskID, // For future ActiveTopology integration
  32. TaskType: types.TaskTypeVacuum,
  33. VolumeID: metric.VolumeID,
  34. Server: metric.Server,
  35. Collection: metric.Collection,
  36. Priority: priority,
  37. Reason: "Volume has excessive garbage requiring vacuum",
  38. ScheduleAt: time.Now(),
  39. }
  40. // Create typed parameters for vacuum task
  41. result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo)
  42. results = append(results, result)
  43. } else {
  44. // Debug why volume was not selected
  45. if debugCount < 5 { // Limit debug output to first 5 volumes
  46. if metric.GarbageRatio < vacuumConfig.GarbageThreshold {
  47. skippedDueToGarbage++
  48. }
  49. if metric.Age < minVolumeAge {
  50. skippedDueToAge++
  51. }
  52. }
  53. debugCount++
  54. }
  55. }
  56. // Log debug summary if no tasks were created
  57. if len(results) == 0 && len(metrics) > 0 {
  58. totalVolumes := len(metrics)
  59. glog.Infof("VACUUM: No tasks created for %d volumes. Threshold=%.2f%%, MinAge=%s. Skipped: %d (garbage<threshold), %d (age<minimum)",
  60. totalVolumes, vacuumConfig.GarbageThreshold*100, minVolumeAge, skippedDueToGarbage, skippedDueToAge)
  61. // Show details for first few volumes
  62. for i, metric := range metrics {
  63. if i >= 3 { // Limit to first 3 volumes
  64. break
  65. }
  66. glog.Infof("VACUUM: Volume %d: garbage=%.2f%% (need ≥%.2f%%), age=%s (need ≥%s)",
  67. metric.VolumeID, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100,
  68. metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute))
  69. }
  70. }
  71. return results, nil
  72. }
  73. // createVacuumTaskParams creates typed parameters for vacuum tasks
  74. // This function is moved from MaintenanceIntegration.createVacuumTaskParams to the detection logic
  75. func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.VolumeHealthMetrics, vacuumConfig *Config, clusterInfo *types.ClusterInfo) *worker_pb.TaskParams {
  76. // Use configured values or defaults
  77. garbageThreshold := 0.3 // Default 30%
  78. verifyChecksum := true // Default to verify
  79. batchSize := int32(1000) // Default batch size
  80. workingDir := "/tmp/seaweedfs_vacuum_work" // Default working directory
  81. if vacuumConfig != nil {
  82. garbageThreshold = vacuumConfig.GarbageThreshold
  83. // Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours, MinIntervalSeconds
  84. // Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added
  85. // to the protobuf definition if they should be configurable
  86. }
  87. // Use DC and rack information directly from VolumeHealthMetrics
  88. sourceDC, sourceRack := metric.DataCenter, metric.Rack
  89. // Create typed protobuf parameters with unified sources
  90. return &worker_pb.TaskParams{
  91. TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated)
  92. VolumeId: task.VolumeID,
  93. Collection: task.Collection,
  94. VolumeSize: metric.Size, // Store original volume size for tracking changes
  95. // Unified sources array
  96. Sources: []*worker_pb.TaskSource{
  97. {
  98. Node: task.Server,
  99. VolumeId: task.VolumeID,
  100. EstimatedSize: metric.Size,
  101. DataCenter: sourceDC,
  102. Rack: sourceRack,
  103. },
  104. },
  105. TaskParams: &worker_pb.TaskParams_VacuumParams{
  106. VacuumParams: &worker_pb.VacuumTaskParams{
  107. GarbageThreshold: garbageThreshold,
  108. ForceVacuum: false,
  109. BatchSize: batchSize,
  110. WorkingDir: workingDir,
  111. VerifyChecksum: verifyChecksum,
  112. },
  113. },
  114. }
  115. }