vacuum_task.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package vacuum
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  13. "github.com/seaweedfs/seaweedfs/weed/worker/types/base"
  14. "google.golang.org/grpc"
  15. )
  16. // VacuumTask implements the Task interface
  17. type VacuumTask struct {
  18. *base.BaseTask
  19. server string
  20. volumeID uint32
  21. collection string
  22. garbageThreshold float64
  23. progress float64
  24. }
  25. // NewVacuumTask creates a new unified vacuum task instance
  26. func NewVacuumTask(id string, server string, volumeID uint32, collection string) *VacuumTask {
  27. return &VacuumTask{
  28. BaseTask: base.NewBaseTask(id, types.TaskTypeVacuum),
  29. server: server,
  30. volumeID: volumeID,
  31. collection: collection,
  32. garbageThreshold: 0.3, // Default 30% threshold
  33. }
  34. }
  35. // Execute implements the UnifiedTask interface
  36. func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
  37. if params == nil {
  38. return fmt.Errorf("task parameters are required")
  39. }
  40. vacuumParams := params.GetVacuumParams()
  41. if vacuumParams == nil {
  42. return fmt.Errorf("vacuum parameters are required")
  43. }
  44. t.garbageThreshold = vacuumParams.GarbageThreshold
  45. t.GetLogger().WithFields(map[string]interface{}{
  46. "volume_id": t.volumeID,
  47. "server": t.server,
  48. "collection": t.collection,
  49. "garbage_threshold": t.garbageThreshold,
  50. }).Info("Starting vacuum task")
  51. // Step 1: Check volume status and garbage ratio
  52. t.ReportProgress(10.0)
  53. t.GetLogger().Info("Checking volume status")
  54. eligible, currentGarbageRatio, err := t.checkVacuumEligibility()
  55. if err != nil {
  56. return fmt.Errorf("failed to check vacuum eligibility: %v", err)
  57. }
  58. if !eligible {
  59. t.GetLogger().WithFields(map[string]interface{}{
  60. "current_garbage_ratio": currentGarbageRatio,
  61. "required_threshold": t.garbageThreshold,
  62. }).Info("Volume does not meet vacuum criteria, skipping")
  63. t.ReportProgress(100.0)
  64. return nil
  65. }
  66. // Step 2: Perform vacuum operation
  67. t.ReportProgress(50.0)
  68. t.GetLogger().WithFields(map[string]interface{}{
  69. "garbage_ratio": currentGarbageRatio,
  70. "threshold": t.garbageThreshold,
  71. }).Info("Performing vacuum operation")
  72. if err := t.performVacuum(); err != nil {
  73. return fmt.Errorf("failed to perform vacuum: %v", err)
  74. }
  75. // Step 3: Verify vacuum results
  76. t.ReportProgress(90.0)
  77. t.GetLogger().Info("Verifying vacuum results")
  78. if err := t.verifyVacuumResults(); err != nil {
  79. glog.Warningf("Vacuum verification failed: %v", err)
  80. // Don't fail the task - vacuum operation itself succeeded
  81. }
  82. t.ReportProgress(100.0)
  83. glog.Infof("Vacuum task completed successfully: volume %d from %s (garbage ratio was %.2f%%)",
  84. t.volumeID, t.server, currentGarbageRatio*100)
  85. return nil
  86. }
  87. // Validate implements the UnifiedTask interface
  88. func (t *VacuumTask) Validate(params *worker_pb.TaskParams) error {
  89. if params == nil {
  90. return fmt.Errorf("task parameters are required")
  91. }
  92. vacuumParams := params.GetVacuumParams()
  93. if vacuumParams == nil {
  94. return fmt.Errorf("vacuum parameters are required")
  95. }
  96. if params.VolumeId != t.volumeID {
  97. return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
  98. }
  99. // Validate that at least one source matches our server
  100. found := false
  101. for _, source := range params.Sources {
  102. if source.Node == t.server {
  103. found = true
  104. break
  105. }
  106. }
  107. if !found {
  108. return fmt.Errorf("no source matches expected server %s", t.server)
  109. }
  110. if vacuumParams.GarbageThreshold < 0 || vacuumParams.GarbageThreshold > 1.0 {
  111. return fmt.Errorf("invalid garbage threshold: %f (must be between 0.0 and 1.0)", vacuumParams.GarbageThreshold)
  112. }
  113. return nil
  114. }
  115. // EstimateTime implements the UnifiedTask interface
  116. func (t *VacuumTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
  117. // Basic estimate based on simulated steps
  118. return 14 * time.Second // Sum of all step durations
  119. }
  120. // GetProgress returns current progress
  121. func (t *VacuumTask) GetProgress() float64 {
  122. return t.progress
  123. }
  124. // Helper methods for real vacuum operations
  125. // checkVacuumEligibility checks if the volume meets vacuum criteria
  126. func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
  127. var garbageRatio float64
  128. err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
  129. func(client volume_server_pb.VolumeServerClient) error {
  130. resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
  131. VolumeId: t.volumeID,
  132. })
  133. if err != nil {
  134. return fmt.Errorf("failed to check volume vacuum status: %v", err)
  135. }
  136. garbageRatio = resp.GarbageRatio
  137. return nil
  138. })
  139. if err != nil {
  140. return false, 0, err
  141. }
  142. eligible := garbageRatio >= t.garbageThreshold
  143. glog.V(1).Infof("Volume %d garbage ratio: %.2f%%, threshold: %.2f%%, eligible: %v",
  144. t.volumeID, garbageRatio*100, t.garbageThreshold*100, eligible)
  145. return eligible, garbageRatio, nil
  146. }
  147. // performVacuum executes the actual vacuum operation
  148. func (t *VacuumTask) performVacuum() error {
  149. return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
  150. func(client volume_server_pb.VolumeServerClient) error {
  151. // Step 1: Compact the volume
  152. t.GetLogger().Info("Compacting volume")
  153. stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
  154. VolumeId: t.volumeID,
  155. })
  156. if err != nil {
  157. return fmt.Errorf("vacuum compact failed: %v", err)
  158. }
  159. // Read compact progress
  160. for {
  161. resp, recvErr := stream.Recv()
  162. if recvErr != nil {
  163. if recvErr == io.EOF {
  164. break
  165. }
  166. return fmt.Errorf("vacuum compact stream error: %v", recvErr)
  167. }
  168. glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes)
  169. }
  170. // Step 2: Commit the vacuum
  171. t.GetLogger().Info("Committing vacuum operation")
  172. _, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
  173. VolumeId: t.volumeID,
  174. })
  175. if err != nil {
  176. return fmt.Errorf("vacuum commit failed: %v", err)
  177. }
  178. // Step 3: Cleanup old files
  179. t.GetLogger().Info("Cleaning up vacuum files")
  180. _, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
  181. VolumeId: t.volumeID,
  182. })
  183. if err != nil {
  184. return fmt.Errorf("vacuum cleanup failed: %v", err)
  185. }
  186. glog.V(1).Infof("Volume %d vacuum operation completed successfully", t.volumeID)
  187. return nil
  188. })
  189. }
  190. // verifyVacuumResults checks the volume status after vacuum
  191. func (t *VacuumTask) verifyVacuumResults() error {
  192. return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
  193. func(client volume_server_pb.VolumeServerClient) error {
  194. resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
  195. VolumeId: t.volumeID,
  196. })
  197. if err != nil {
  198. return fmt.Errorf("failed to verify vacuum results: %v", err)
  199. }
  200. postVacuumGarbageRatio := resp.GarbageRatio
  201. glog.V(1).Infof("Volume %d post-vacuum garbage ratio: %.2f%%",
  202. t.volumeID, postVacuumGarbageRatio*100)
  203. return nil
  204. })
  205. }