balance_task.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package balance
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  15. "github.com/seaweedfs/seaweedfs/weed/worker/types/base"
  16. "google.golang.org/grpc"
  17. )
  18. // BalanceTask implements the Task interface
  19. type BalanceTask struct {
  20. *base.BaseTask
  21. server string
  22. volumeID uint32
  23. collection string
  24. progress float64
  25. }
  26. // NewBalanceTask creates a new balance task instance
  27. func NewBalanceTask(id string, server string, volumeID uint32, collection string) *BalanceTask {
  28. return &BalanceTask{
  29. BaseTask: base.NewBaseTask(id, types.TaskTypeBalance),
  30. server: server,
  31. volumeID: volumeID,
  32. collection: collection,
  33. }
  34. }
  35. // Execute implements the Task interface
  36. func (t *BalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
  37. if params == nil {
  38. return fmt.Errorf("task parameters are required")
  39. }
  40. balanceParams := params.GetBalanceParams()
  41. if balanceParams == nil {
  42. return fmt.Errorf("balance parameters are required")
  43. }
  44. // Get source and destination from unified arrays
  45. if len(params.Sources) == 0 {
  46. return fmt.Errorf("source is required for balance task")
  47. }
  48. if len(params.Targets) == 0 {
  49. return fmt.Errorf("target is required for balance task")
  50. }
  51. sourceNode := params.Sources[0].Node
  52. destNode := params.Targets[0].Node
  53. if sourceNode == "" {
  54. return fmt.Errorf("source node is required for balance task")
  55. }
  56. if destNode == "" {
  57. return fmt.Errorf("destination node is required for balance task")
  58. }
  59. t.GetLogger().WithFields(map[string]interface{}{
  60. "volume_id": t.volumeID,
  61. "source": sourceNode,
  62. "destination": destNode,
  63. "collection": t.collection,
  64. }).Info("Starting balance task - moving volume")
  65. sourceServer := pb.ServerAddress(sourceNode)
  66. targetServer := pb.ServerAddress(destNode)
  67. volumeId := needle.VolumeId(t.volumeID)
  68. // Step 1: Mark volume readonly
  69. t.ReportProgress(10.0)
  70. t.GetLogger().Info("Marking volume readonly for move")
  71. if err := t.markVolumeReadonly(sourceServer, volumeId); err != nil {
  72. return fmt.Errorf("failed to mark volume readonly: %v", err)
  73. }
  74. // Step 2: Copy volume to destination
  75. t.ReportProgress(20.0)
  76. t.GetLogger().Info("Copying volume to destination")
  77. lastAppendAtNs, err := t.copyVolume(sourceServer, targetServer, volumeId)
  78. if err != nil {
  79. return fmt.Errorf("failed to copy volume: %v", err)
  80. }
  81. // Step 3: Mount volume on target and mark it readonly
  82. t.ReportProgress(60.0)
  83. t.GetLogger().Info("Mounting volume on target server")
  84. if err := t.mountVolume(targetServer, volumeId); err != nil {
  85. return fmt.Errorf("failed to mount volume on target: %v", err)
  86. }
  87. // Step 4: Tail for updates
  88. t.ReportProgress(70.0)
  89. t.GetLogger().Info("Syncing final updates")
  90. if err := t.tailVolume(sourceServer, targetServer, volumeId, lastAppendAtNs); err != nil {
  91. glog.Warningf("Tail operation failed (may be normal): %v", err)
  92. }
  93. // Step 5: Unmount from source
  94. t.ReportProgress(85.0)
  95. t.GetLogger().Info("Unmounting volume from source server")
  96. if err := t.unmountVolume(sourceServer, volumeId); err != nil {
  97. return fmt.Errorf("failed to unmount volume from source: %v", err)
  98. }
  99. // Step 6: Delete from source
  100. t.ReportProgress(95.0)
  101. t.GetLogger().Info("Deleting volume from source server")
  102. if err := t.deleteVolume(sourceServer, volumeId); err != nil {
  103. return fmt.Errorf("failed to delete volume from source: %v", err)
  104. }
  105. t.ReportProgress(100.0)
  106. glog.Infof("Balance task completed successfully: volume %d moved from %s to %s",
  107. t.volumeID, t.server, destNode)
  108. return nil
  109. }
  110. // Validate implements the UnifiedTask interface
  111. func (t *BalanceTask) Validate(params *worker_pb.TaskParams) error {
  112. if params == nil {
  113. return fmt.Errorf("task parameters are required")
  114. }
  115. balanceParams := params.GetBalanceParams()
  116. if balanceParams == nil {
  117. return fmt.Errorf("balance parameters are required")
  118. }
  119. if params.VolumeId != t.volumeID {
  120. return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
  121. }
  122. // Validate that at least one source matches our server
  123. found := false
  124. for _, source := range params.Sources {
  125. if source.Node == t.server {
  126. found = true
  127. break
  128. }
  129. }
  130. if !found {
  131. return fmt.Errorf("no source matches expected server %s", t.server)
  132. }
  133. return nil
  134. }
  135. // EstimateTime implements the UnifiedTask interface
  136. func (t *BalanceTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
  137. // Basic estimate based on simulated steps
  138. return 14 * time.Second // Sum of all step durations
  139. }
  140. // GetProgress returns current progress
  141. func (t *BalanceTask) GetProgress() float64 {
  142. return t.progress
  143. }
  144. // Helper methods for real balance operations
  145. // markVolumeReadonly marks the volume readonly
  146. func (t *BalanceTask) markVolumeReadonly(server pb.ServerAddress, volumeId needle.VolumeId) error {
  147. return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(),
  148. func(client volume_server_pb.VolumeServerClient) error {
  149. _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
  150. VolumeId: uint32(volumeId),
  151. })
  152. return err
  153. })
  154. }
  155. // copyVolume copies volume from source to target server
  156. func (t *BalanceTask) copyVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId) (uint64, error) {
  157. var lastAppendAtNs uint64
  158. err := operation.WithVolumeServerClient(true, targetServer, grpc.WithInsecure(),
  159. func(client volume_server_pb.VolumeServerClient) error {
  160. stream, err := client.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  161. VolumeId: uint32(volumeId),
  162. SourceDataNode: string(sourceServer),
  163. })
  164. if err != nil {
  165. return err
  166. }
  167. for {
  168. resp, recvErr := stream.Recv()
  169. if recvErr != nil {
  170. if recvErr == io.EOF {
  171. break
  172. }
  173. return recvErr
  174. }
  175. if resp.LastAppendAtNs != 0 {
  176. lastAppendAtNs = resp.LastAppendAtNs
  177. } else {
  178. // Report copy progress
  179. glog.V(1).Infof("Volume %d copy progress: %s", volumeId,
  180. util.BytesToHumanReadable(uint64(resp.ProcessedBytes)))
  181. }
  182. }
  183. return nil
  184. })
  185. return lastAppendAtNs, err
  186. }
  187. // mountVolume mounts the volume on the target server
  188. func (t *BalanceTask) mountVolume(server pb.ServerAddress, volumeId needle.VolumeId) error {
  189. return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(),
  190. func(client volume_server_pb.VolumeServerClient) error {
  191. _, err := client.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
  192. VolumeId: uint32(volumeId),
  193. })
  194. return err
  195. })
  196. }
  197. // tailVolume syncs remaining updates from source to target
  198. func (t *BalanceTask) tailVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId, sinceNs uint64) error {
  199. return operation.WithVolumeServerClient(true, targetServer, grpc.WithInsecure(),
  200. func(client volume_server_pb.VolumeServerClient) error {
  201. _, err := client.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
  202. VolumeId: uint32(volumeId),
  203. SinceNs: sinceNs,
  204. IdleTimeoutSeconds: 60, // 1 minute timeout
  205. SourceVolumeServer: string(sourceServer),
  206. })
  207. return err
  208. })
  209. }
  210. // unmountVolume unmounts the volume from the server
  211. func (t *BalanceTask) unmountVolume(server pb.ServerAddress, volumeId needle.VolumeId) error {
  212. return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(),
  213. func(client volume_server_pb.VolumeServerClient) error {
  214. _, err := client.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
  215. VolumeId: uint32(volumeId),
  216. })
  217. return err
  218. })
  219. }
  220. // deleteVolume deletes the volume from the server
  221. func (t *BalanceTask) deleteVolume(server pb.ServerAddress, volumeId needle.VolumeId) error {
  222. return operation.WithVolumeServerClient(false, server, grpc.WithInsecure(),
  223. func(client volume_server_pb.VolumeServerClient) error {
  224. _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
  225. VolumeId: uint32(volumeId),
  226. OnlyEmpty: false,
  227. })
  228. return err
  229. })
  230. }