register.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package vacuum
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
  8. "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
  9. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  10. )
  11. // Global variable to hold the task definition for configuration updates
  12. var globalTaskDef *base.TaskDefinition
  13. // Auto-register this task when the package is imported
  14. func init() {
  15. RegisterVacuumTask()
  16. // Register config updater
  17. tasks.AutoRegisterConfigUpdater(types.TaskTypeVacuum, UpdateConfigFromPersistence)
  18. }
  19. // RegisterVacuumTask registers the vacuum task with the new architecture
  20. func RegisterVacuumTask() {
  21. // Create configuration instance
  22. config := NewDefaultConfig()
  23. // Create complete task definition
  24. taskDef := &base.TaskDefinition{
  25. Type: types.TaskTypeVacuum,
  26. Name: "vacuum",
  27. DisplayName: "Volume Vacuum",
  28. Description: "Reclaims disk space by removing deleted files from volumes",
  29. Icon: "fas fa-broom text-primary",
  30. Capabilities: []string{"vacuum", "storage"},
  31. Config: config,
  32. ConfigSpec: GetConfigSpec(),
  33. CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) {
  34. if params == nil {
  35. return nil, fmt.Errorf("task parameters are required")
  36. }
  37. if len(params.Sources) == 0 {
  38. return nil, fmt.Errorf("at least one source is required for vacuum task")
  39. }
  40. return NewVacuumTask(
  41. fmt.Sprintf("vacuum-%d", params.VolumeId),
  42. params.Sources[0].Node, // Use first source node
  43. params.VolumeId,
  44. params.Collection,
  45. ), nil
  46. },
  47. DetectionFunc: Detection,
  48. ScanInterval: 2 * time.Hour,
  49. SchedulingFunc: Scheduling,
  50. MaxConcurrent: 2,
  51. RepeatInterval: 7 * 24 * time.Hour,
  52. }
  53. // Store task definition globally for configuration updates
  54. globalTaskDef = taskDef
  55. // Register everything with a single function call!
  56. base.RegisterTask(taskDef)
  57. }
  58. // UpdateConfigFromPersistence updates the vacuum configuration from persistence
  59. func UpdateConfigFromPersistence(configPersistence interface{}) error {
  60. if globalTaskDef == nil {
  61. return fmt.Errorf("vacuum task not registered")
  62. }
  63. // Load configuration from persistence
  64. newConfig := LoadConfigFromPersistence(configPersistence)
  65. if newConfig == nil {
  66. return fmt.Errorf("failed to load configuration from persistence")
  67. }
  68. // Update the task definition's config
  69. globalTaskDef.Config = newConfig
  70. glog.V(1).Infof("Updated vacuum task configuration from persistence")
  71. return nil
  72. }