command_fs_meta_change_volume_id.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "io"
  10. "os"
  11. "strconv"
  12. "strings"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandFsMetaChangeVolumeId{})
  16. }
  17. type commandFsMetaChangeVolumeId struct {
  18. }
  19. func (c *commandFsMetaChangeVolumeId) Name() string {
  20. return "fs.meta.changeVolumeId"
  21. }
  22. func (c *commandFsMetaChangeVolumeId) Help() string {
  23. return `change volume id in existing metadata.
  24. fs.meta.changeVolumeId -dir=/path/to/a/dir -fromVolumeId=x -toVolumeId=y -force
  25. fs.meta.changeVolumeId -dir=/path/to/a/dir -mapping=/path/to/mapping/file -force
  26. The mapping file should have these lines, each line is: [fromVolumeId]=>[toVolumeId]
  27. e.g.
  28. 1 => 2
  29. 3 => 4
  30. `
  31. }
  32. func (c *commandFsMetaChangeVolumeId) HasTag(CommandTag) bool {
  33. return false
  34. }
  35. func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  36. fsMetaChangeVolumeIdCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  37. dir := fsMetaChangeVolumeIdCommand.String("dir", "/", "fix all metadata under this folder")
  38. mappingFileName := fsMetaChangeVolumeIdCommand.String("mapping", "", "a file with multiple volume id changes, with each line as x=>y")
  39. fromVolumeId := fsMetaChangeVolumeIdCommand.Uint("fromVolumeId", 0, "change metadata with this volume id")
  40. toVolumeId := fsMetaChangeVolumeIdCommand.Uint("toVolumeId", 0, "change metadata to this volume id")
  41. isForce := fsMetaChangeVolumeIdCommand.Bool("force", false, "applying the metadata changes")
  42. if err = fsMetaChangeVolumeIdCommand.Parse(args); err != nil {
  43. return err
  44. }
  45. // load the mapping
  46. mapping := make(map[needle.VolumeId]needle.VolumeId)
  47. if *mappingFileName != "" {
  48. readMappingFromFile(*mappingFileName, mapping)
  49. } else {
  50. if *fromVolumeId == *toVolumeId {
  51. return fmt.Errorf("no volume id changes")
  52. }
  53. if *fromVolumeId == 0 || *toVolumeId == 0 {
  54. return fmt.Errorf("volume id can not be zero")
  55. }
  56. mapping[needle.VolumeId(*fromVolumeId)] = needle.VolumeId(*toVolumeId)
  57. }
  58. return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  59. return filer_pb.TraverseBfs(commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  60. if !entry.IsDirectory {
  61. var hasChanges bool
  62. for _, chunk := range entry.Chunks {
  63. if chunk.IsChunkManifest {
  64. fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
  65. return
  66. }
  67. chunkVolumeId := chunk.Fid.VolumeId
  68. if toVolumeId, found := mapping[needle.VolumeId(chunkVolumeId)]; found {
  69. hasChanges = true
  70. chunk.Fid.VolumeId = uint32(toVolumeId)
  71. chunk.FileId = ""
  72. }
  73. }
  74. if hasChanges {
  75. println("Updating", parentPath, entry.Name)
  76. if *isForce {
  77. if updateErr := filer_pb.UpdateEntry(context.Background(), client, &filer_pb.UpdateEntryRequest{
  78. Directory: string(parentPath),
  79. Entry: entry,
  80. }); updateErr != nil {
  81. fmt.Printf("failed to update %s/%s: %v\n", parentPath, entry.Name, updateErr)
  82. }
  83. }
  84. }
  85. }
  86. })
  87. })
  88. }
  89. func readMappingFromFile(filename string, mapping map[needle.VolumeId]needle.VolumeId) error {
  90. mappingFile, openErr := os.Open(filename)
  91. if openErr != nil {
  92. return fmt.Errorf("failed to open file %s: %v", filename, openErr)
  93. }
  94. defer mappingFile.Close()
  95. mappingContent, readErr := io.ReadAll(mappingFile)
  96. if readErr != nil {
  97. return fmt.Errorf("failed to read file %s: %v", filename, readErr)
  98. }
  99. for _, line := range strings.Split(string(mappingContent), "\n") {
  100. parts := strings.Split(line, "=>")
  101. if len(parts) != 2 {
  102. println("unrecognized line:", line)
  103. continue
  104. }
  105. x, errX := strconv.ParseUint(strings.TrimSpace(parts[0]), 10, 64)
  106. if errX != nil {
  107. return errX
  108. }
  109. y, errY := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64)
  110. if errY != nil {
  111. return errY
  112. }
  113. mapping[needle.VolumeId(x)] = needle.VolumeId(y)
  114. }
  115. return nil
  116. }