command_s3_bucket_quota_check.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package shell
  2. import (
  3. "bytes"
  4. "context"
  5. "flag"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "io"
  10. "math"
  11. )
  12. func init() {
  13. Commands = append(Commands, &commandS3BucketQuotaEnforce{})
  14. }
  15. type commandS3BucketQuotaEnforce struct {
  16. }
  17. func (c *commandS3BucketQuotaEnforce) Name() string {
  18. return "s3.bucket.quota.enforce"
  19. }
  20. func (c *commandS3BucketQuotaEnforce) Help() string {
  21. return `check quota for all buckets, make the bucket read only if over the limit
  22. Example:
  23. s3.bucket.quota.enforce -apply
  24. `
  25. }
  26. func (c *commandS3BucketQuotaEnforce) HasTag(CommandTag) bool {
  27. return false
  28. }
  29. func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  30. bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  31. applyQuotaLimit := bucketCommand.Bool("apply", false, "actually change the buckets readonly attribute")
  32. if err = bucketCommand.Parse(args); err != nil {
  33. return nil
  34. }
  35. infoAboutSimulationMode(writer, *applyQuotaLimit, "-apply")
  36. // collect collection information
  37. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  38. if err != nil {
  39. return err
  40. }
  41. collectionInfos := make(map[string]*CollectionInfo)
  42. collectCollectionInfo(topologyInfo, collectionInfos)
  43. // read buckets path
  44. var filerBucketsPath string
  45. filerBucketsPath, err = readFilerBucketsPath(commandEnv)
  46. if err != nil {
  47. return fmt.Errorf("read buckets: %w", err)
  48. }
  49. // read existing filer configuration
  50. fc, err := filer.ReadFilerConf(commandEnv.option.FilerAddress, commandEnv.option.GrpcDialOption, commandEnv.MasterClient)
  51. if err != nil {
  52. return err
  53. }
  54. // process each bucket
  55. hasConfChanges := false
  56. err = filer_pb.List(context.Background(), commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
  57. if !entry.IsDirectory {
  58. return nil
  59. }
  60. collection := getCollectionName(commandEnv, entry.Name)
  61. var collectionSize float64
  62. if collectionInfo, found := collectionInfos[collection]; found {
  63. collectionSize = collectionInfo.Size
  64. }
  65. if c.processEachBucket(fc, filerBucketsPath, entry, writer, collectionSize) {
  66. hasConfChanges = true
  67. }
  68. return nil
  69. }, "", false, math.MaxUint32)
  70. if err != nil {
  71. return fmt.Errorf("list buckets under %v: %w", filerBucketsPath, err)
  72. }
  73. // apply the configuration changes
  74. if hasConfChanges && *applyQuotaLimit {
  75. var buf2 bytes.Buffer
  76. fc.ToText(&buf2)
  77. if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  78. return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes())
  79. }); err != nil && err != filer_pb.ErrNotFound {
  80. return err
  81. }
  82. }
  83. return err
  84. }
  85. func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, filerBucketsPath string, entry *filer_pb.Entry, writer io.Writer, collectionSize float64) (hasConfChanges bool) {
  86. locPrefix := filerBucketsPath + "/" + entry.Name + "/"
  87. locConf := fc.MatchStorageRule(locPrefix)
  88. locConf.LocationPrefix = locPrefix
  89. if entry.Quota > 0 {
  90. if locConf.ReadOnly {
  91. if collectionSize < float64(entry.Quota) {
  92. locConf.ReadOnly = false
  93. hasConfChanges = true
  94. }
  95. } else {
  96. if collectionSize > float64(entry.Quota) {
  97. locConf.ReadOnly = true
  98. hasConfChanges = true
  99. }
  100. }
  101. } else {
  102. if locConf.ReadOnly {
  103. locConf.ReadOnly = false
  104. hasConfChanges = true
  105. }
  106. }
  107. if hasConfChanges {
  108. fmt.Fprintf(writer, " %s\tsize:%.0f", entry.Name, collectionSize)
  109. fmt.Fprintf(writer, "\tquota:%d\tusage:%.2f%%", entry.Quota, collectionSize*100/float64(entry.Quota))
  110. fmt.Fprintln(writer)
  111. if locConf.ReadOnly {
  112. fmt.Fprintf(writer, " changing bucket %s to read only!\n", entry.Name)
  113. } else {
  114. fmt.Fprintf(writer, " changing bucket %s to writable.\n", entry.Name)
  115. }
  116. fc.SetLocationConf(locConf)
  117. }
  118. return
  119. }