store_vacuum.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package storage
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/stats"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  7. )
  8. func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
  9. if v := s.findVolume(volumeId); v != nil {
  10. glog.V(3).Infof("volume %d garbage level: %f", volumeId, v.garbageLevel())
  11. return v.garbageLevel(), nil
  12. }
  13. return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId)
  14. }
  15. func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error {
  16. if v := s.findVolume(vid); v != nil {
  17. // Get current volume size for space calculation
  18. volumeSize, indexSize, _ := v.FileStat()
  19. // Calculate space needed for compaction:
  20. // 1. Space for the new compacted volume (approximately same as current volume size)
  21. // 2. Use the larger of preallocate or estimated volume size
  22. estimatedCompactSize := int64(volumeSize + indexSize)
  23. spaceNeeded := preallocate
  24. if estimatedCompactSize > preallocate {
  25. spaceNeeded = estimatedCompactSize
  26. }
  27. diskStatus := stats.NewDiskStatus(v.dir)
  28. if int64(diskStatus.Free) < spaceNeeded {
  29. return fmt.Errorf("insufficient free space for compaction: need %d bytes (volume: %d, index: %d, buffer: 10%%), but only %d bytes available",
  30. spaceNeeded, volumeSize, indexSize, diskStatus.Free)
  31. }
  32. glog.V(1).Infof("volume %d compaction space check: volume=%d, index=%d, space_needed=%d, free_space=%d",
  33. vid, volumeSize, indexSize, spaceNeeded, diskStatus.Free)
  34. return v.Compact2(preallocate, compactionBytePerSecond, progressFn)
  35. }
  36. return fmt.Errorf("volume id %d is not found during compact", vid)
  37. }
  38. func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, int64, error) {
  39. if s.isStopping {
  40. return false, 0, fmt.Errorf("volume id %d skips compact because volume is stopping", vid)
  41. }
  42. if v := s.findVolume(vid); v != nil {
  43. isReadOnly := v.IsReadOnly()
  44. err := v.CommitCompact()
  45. var volumeSize int64 = 0
  46. if err == nil && v.DataBackend != nil {
  47. volumeSize, _, _ = v.DataBackend.GetStat()
  48. }
  49. return isReadOnly, volumeSize, err
  50. }
  51. return false, 0, fmt.Errorf("volume id %d is not found during commit compact", vid)
  52. }
  53. func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error {
  54. if v := s.findVolume(vid); v != nil {
  55. return v.cleanupCompact()
  56. }
  57. return fmt.Errorf("volume id %d is not found during cleaning up", vid)
  58. }