command_s3_clean_uploads.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "math"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  10. "github.com/seaweedfs/seaweedfs/weed/security"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  14. )
  15. func init() {
  16. Commands = append(Commands, &commandS3CleanUploads{})
  17. }
  18. type commandS3CleanUploads struct{}
  19. func (c *commandS3CleanUploads) Name() string {
  20. return "s3.clean.uploads"
  21. }
  22. func (c *commandS3CleanUploads) Help() string {
  23. return `clean up stale multipart uploads
  24. Example:
  25. s3.clean.uploads -timeAgo 1.5h
  26. `
  27. }
  28. func (c *commandS3CleanUploads) HasTag(CommandTag) bool {
  29. return false
  30. }
  31. func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  32. bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  33. uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"")
  34. if err = bucketCommand.Parse(args); err != nil {
  35. return nil
  36. }
  37. signingKey := util.GetViper().GetString("jwt.filer_signing.key")
  38. var filerBucketsPath string
  39. filerBucketsPath, err = readFilerBucketsPath(commandEnv)
  40. if err != nil {
  41. return fmt.Errorf("read buckets: %w", err)
  42. }
  43. var buckets []string
  44. err = filer_pb.List(context.Background(), commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
  45. buckets = append(buckets, entry.Name)
  46. return nil
  47. }, "", false, math.MaxUint32)
  48. if err != nil {
  49. return fmt.Errorf("list buckets under %v: %w", filerBucketsPath, err)
  50. }
  51. for _, bucket := range buckets {
  52. if err := c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo, signingKey); err != nil {
  53. fmt.Fprintf(writer, "failed cleanup uploads for bucket %s: %v", bucket, err)
  54. }
  55. }
  56. return err
  57. }
  58. func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration, signingKey string) error {
  59. uploadsDir := filerBucketsPath + "/" + bucket + "/" + s3_constants.MultipartUploadsFolder
  60. var staleUploads []string
  61. now := time.Now()
  62. err := filer_pb.List(context.Background(), commandEnv, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  63. ctime := time.Unix(entry.Attributes.Crtime, 0)
  64. if ctime.Add(timeAgo).Before(now) {
  65. staleUploads = append(staleUploads, entry.Name)
  66. }
  67. return nil
  68. }, "", false, math.MaxUint32)
  69. if err != nil {
  70. return fmt.Errorf("list uploads under %v: %w", uploadsDir, err)
  71. }
  72. var encodedJwt security.EncodedJwt
  73. if signingKey != "" {
  74. encodedJwt = security.GenJwtForFilerServer(security.SigningKey(signingKey), 15*60)
  75. }
  76. for _, staleUpload := range staleUploads {
  77. deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload)
  78. fmt.Fprintf(writer, "purge %s\n", deleteUrl)
  79. err = util_http.Delete(deleteUrl, string(encodedJwt))
  80. if err != nil && err.Error() != "" {
  81. return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
  82. }
  83. }
  84. return nil
  85. }