register.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package erasure_coding
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
  8. "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
  9. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  10. )
  11. // Global variable to hold the task definition for configuration updates
  12. var globalTaskDef *base.TaskDefinition
  13. // Auto-register this task when the package is imported
  14. func init() {
  15. RegisterErasureCodingTask()
  16. // Register config updater
  17. tasks.AutoRegisterConfigUpdater(types.TaskTypeErasureCoding, UpdateConfigFromPersistence)
  18. }
  19. // RegisterErasureCodingTask registers the erasure coding task with the new architecture
  20. func RegisterErasureCodingTask() {
  21. // Create configuration instance
  22. config := NewDefaultConfig()
  23. // Create complete task definition
  24. taskDef := &base.TaskDefinition{
  25. Type: types.TaskTypeErasureCoding,
  26. Name: "erasure_coding",
  27. DisplayName: "Erasure Coding",
  28. Description: "Applies erasure coding to volumes for data protection",
  29. Icon: "fas fa-shield-alt text-success",
  30. Capabilities: []string{"erasure_coding", "data_protection"},
  31. Config: config,
  32. ConfigSpec: GetConfigSpec(),
  33. CreateTask: func(params *worker_pb.TaskParams) (types.Task, error) {
  34. if params == nil {
  35. return nil, fmt.Errorf("task parameters are required")
  36. }
  37. if len(params.Sources) == 0 {
  38. return nil, fmt.Errorf("at least one source is required for erasure coding task")
  39. }
  40. return NewErasureCodingTask(
  41. fmt.Sprintf("erasure_coding-%d", params.VolumeId),
  42. params.Sources[0].Node, // Use first source node
  43. params.VolumeId,
  44. params.Collection,
  45. ), nil
  46. },
  47. DetectionFunc: Detection,
  48. ScanInterval: 1 * time.Hour,
  49. SchedulingFunc: Scheduling,
  50. MaxConcurrent: 1,
  51. RepeatInterval: 24 * time.Hour,
  52. }
  53. // Store task definition globally for configuration updates
  54. globalTaskDef = taskDef
  55. // Register everything with a single function call!
  56. base.RegisterTask(taskDef)
  57. }
  58. // UpdateConfigFromPersistence updates the erasure coding configuration from persistence
  59. func UpdateConfigFromPersistence(configPersistence interface{}) error {
  60. if globalTaskDef == nil {
  61. return fmt.Errorf("erasure coding task not registered")
  62. }
  63. // Load configuration from persistence
  64. newConfig := LoadConfigFromPersistence(configPersistence)
  65. if newConfig == nil {
  66. return fmt.Errorf("failed to load configuration from persistence")
  67. }
  68. // Update the task definition's config
  69. globalTaskDef.Config = newConfig
  70. glog.V(1).Infof("Updated erasure coding task configuration from persistence")
  71. return nil
  72. }