execution.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package balance
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
  8. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  9. )
  10. // TypedTask implements balance operation with typed protobuf parameters
  11. type TypedTask struct {
  12. *base.BaseTypedTask
  13. // Task state from protobuf
  14. sourceServer string
  15. destNode string
  16. volumeID uint32
  17. collection string
  18. estimatedSize uint64
  19. forceMove bool
  20. timeoutSeconds int32
  21. }
  22. // NewTypedTask creates a new typed balance task
  23. func NewTypedTask() types.TypedTaskInterface {
  24. task := &TypedTask{
  25. BaseTypedTask: base.NewBaseTypedTask(types.TaskTypeBalance),
  26. }
  27. return task
  28. }
  29. // ValidateTyped validates the typed parameters for balance task
  30. func (t *TypedTask) ValidateTyped(params *worker_pb.TaskParams) error {
  31. // Basic validation from base class
  32. if err := t.BaseTypedTask.ValidateTyped(params); err != nil {
  33. return err
  34. }
  35. // Check that we have balance-specific parameters
  36. balanceParams := params.GetBalanceParams()
  37. if balanceParams == nil {
  38. return fmt.Errorf("balance_params is required for balance task")
  39. }
  40. // Validate sources and targets
  41. if len(params.Sources) == 0 {
  42. return fmt.Errorf("at least one source is required for balance task")
  43. }
  44. if len(params.Targets) == 0 {
  45. return fmt.Errorf("at least one target is required for balance task")
  46. }
  47. // Validate that source and target have volume IDs
  48. if params.Sources[0].VolumeId == 0 {
  49. return fmt.Errorf("source volume_id is required for balance task")
  50. }
  51. if params.Targets[0].VolumeId == 0 {
  52. return fmt.Errorf("target volume_id is required for balance task")
  53. }
  54. // Validate timeout
  55. if balanceParams.TimeoutSeconds <= 0 {
  56. return fmt.Errorf("timeout_seconds must be greater than 0")
  57. }
  58. return nil
  59. }
  60. // EstimateTimeTyped estimates the time needed for balance operation based on protobuf parameters
  61. func (t *TypedTask) EstimateTimeTyped(params *worker_pb.TaskParams) time.Duration {
  62. balanceParams := params.GetBalanceParams()
  63. if balanceParams != nil {
  64. // Use the timeout from parameters if specified
  65. if balanceParams.TimeoutSeconds > 0 {
  66. return time.Duration(balanceParams.TimeoutSeconds) * time.Second
  67. }
  68. }
  69. // Estimate based on volume size from sources (1 minute per GB)
  70. if len(params.Sources) > 0 {
  71. source := params.Sources[0]
  72. if source.EstimatedSize > 0 {
  73. gbSize := source.EstimatedSize / (1024 * 1024 * 1024)
  74. return time.Duration(gbSize) * time.Minute
  75. }
  76. }
  77. // Default estimation
  78. return 10 * time.Minute
  79. }
  80. // ExecuteTyped implements the balance operation with typed parameters
  81. func (t *TypedTask) ExecuteTyped(params *worker_pb.TaskParams) error {
  82. // Extract basic parameters
  83. t.volumeID = params.VolumeId
  84. t.collection = params.Collection
  85. // Ensure sources and targets are present (should be guaranteed by validation)
  86. if len(params.Sources) == 0 {
  87. return fmt.Errorf("at least one source is required for balance task (ExecuteTyped)")
  88. }
  89. if len(params.Targets) == 0 {
  90. return fmt.Errorf("at least one target is required for balance task (ExecuteTyped)")
  91. }
  92. // Extract source and target information
  93. t.sourceServer = params.Sources[0].Node
  94. t.estimatedSize = params.Sources[0].EstimatedSize
  95. t.destNode = params.Targets[0].Node
  96. // Extract balance-specific parameters
  97. balanceParams := params.GetBalanceParams()
  98. if balanceParams != nil {
  99. t.forceMove = balanceParams.ForceMove
  100. t.timeoutSeconds = balanceParams.TimeoutSeconds
  101. }
  102. glog.Infof("Starting typed balance task for volume %d: %s -> %s (collection: %s, size: %d bytes)",
  103. t.volumeID, t.sourceServer, t.destNode, t.collection, t.estimatedSize)
  104. // Simulate balance operation with progress updates
  105. steps := []struct {
  106. name string
  107. duration time.Duration
  108. progress float64
  109. }{
  110. {"Analyzing cluster state", 2 * time.Second, 15},
  111. {"Verifying destination capacity", 1 * time.Second, 25},
  112. {"Starting volume migration", 1 * time.Second, 35},
  113. {"Moving volume data", 6 * time.Second, 75},
  114. {"Updating cluster metadata", 2 * time.Second, 95},
  115. {"Verifying balance completion", 1 * time.Second, 100},
  116. }
  117. for _, step := range steps {
  118. if t.IsCancelled() {
  119. return fmt.Errorf("balance task cancelled during: %s", step.name)
  120. }
  121. glog.V(1).Infof("Balance task step: %s", step.name)
  122. t.SetProgress(step.progress)
  123. // Simulate work
  124. time.Sleep(step.duration)
  125. }
  126. glog.Infof("Typed balance task completed successfully for volume %d: %s -> %s",
  127. t.volumeID, t.sourceServer, t.destNode)
  128. return nil
  129. }
  130. // Register the typed task in the global registry
  131. func init() {
  132. types.RegisterGlobalTypedTask(types.TaskTypeBalance, NewTypedTask)
  133. glog.V(1).Infof("Registered typed balance task")
  134. }