command_ec_decode.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. "google.golang.org/grpc"
  10. "github.com/seaweedfs/seaweedfs/weed/operation"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  15. )
  16. func init() {
  17. Commands = append(Commands, &commandEcDecode{})
  18. }
  19. type commandEcDecode struct {
  20. }
  21. func (c *commandEcDecode) Name() string {
  22. return "ec.decode"
  23. }
  24. func (c *commandEcDecode) Help() string {
  25. return `decode a erasure coded volume into a normal volume
  26. ec.decode [-collection=""] [-volumeId=<volume_id>]
  27. The -collection parameter supports regular expressions for pattern matching:
  28. - Use exact match: ec.decode -collection="^mybucket$"
  29. - Match multiple buckets: ec.decode -collection="bucket.*"
  30. - Match all collections: ec.decode -collection=".*"
  31. `
  32. }
  33. func (c *commandEcDecode) HasTag(CommandTag) bool {
  34. return false
  35. }
  36. func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  37. decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  38. volumeId := decodeCommand.Int("volumeId", 0, "the volume id")
  39. collection := decodeCommand.String("collection", "", "the collection name")
  40. if err = decodeCommand.Parse(args); err != nil {
  41. return nil
  42. }
  43. if err = commandEnv.confirmIsLocked(args); err != nil {
  44. return
  45. }
  46. vid := needle.VolumeId(*volumeId)
  47. // collect topology information
  48. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  49. if err != nil {
  50. return err
  51. }
  52. // volumeId is provided
  53. if vid != 0 {
  54. return doEcDecode(commandEnv, topologyInfo, *collection, vid)
  55. }
  56. // apply to all volumes in the collection
  57. volumeIds, err := collectEcShardIds(topologyInfo, *collection)
  58. if err != nil {
  59. return err
  60. }
  61. fmt.Printf("ec decode volumes: %v\n", volumeIds)
  62. for _, vid := range volumeIds {
  63. if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
  64. return err
  65. }
  66. }
  67. return nil
  68. }
  69. func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
  70. if !commandEnv.isLocked() {
  71. return fmt.Errorf("lock is lost")
  72. }
  73. // find volume location
  74. nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
  75. fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
  76. // collect ec shards to the server with most space
  77. targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid)
  78. if err != nil {
  79. return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
  80. }
  81. // generate a normal volume
  82. err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation)
  83. if err != nil {
  84. return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
  85. }
  86. // delete the previous ec shards
  87. err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
  88. if err != nil {
  89. return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
  90. }
  91. return nil
  92. }
  93. func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
  94. // mount volume
  95. if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  96. _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
  97. VolumeId: uint32(vid),
  98. })
  99. return mountErr
  100. }); err != nil {
  101. return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
  102. }
  103. // unmount ec shards
  104. for location, ecIndexBits := range nodeToEcIndexBits {
  105. fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  106. err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
  107. if err != nil {
  108. return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
  109. }
  110. }
  111. // delete ec shards
  112. for location, ecIndexBits := range nodeToEcIndexBits {
  113. fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  114. err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
  115. if err != nil {
  116. return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
  117. }
  118. }
  119. return nil
  120. }
  121. func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
  122. fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
  123. err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  124. _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
  125. VolumeId: uint32(vid),
  126. Collection: collection,
  127. })
  128. return genErr
  129. })
  130. return err
  131. }
  132. func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) {
  133. maxShardCount := 0
  134. var existingEcIndexBits erasure_coding.ShardBits
  135. for loc, ecIndexBits := range nodeToEcIndexBits {
  136. toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount()
  137. if toBeCopiedShardCount > maxShardCount {
  138. maxShardCount = toBeCopiedShardCount
  139. targetNodeLocation = loc
  140. existingEcIndexBits = ecIndexBits
  141. }
  142. }
  143. fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits)
  144. var copiedEcIndexBits erasure_coding.ShardBits
  145. for loc, ecIndexBits := range nodeToEcIndexBits {
  146. if loc == targetNodeLocation {
  147. continue
  148. }
  149. needToCopyEcIndexBits := ecIndexBits.Minus(existingEcIndexBits).MinusParityShards()
  150. if needToCopyEcIndexBits.ShardIdCount() == 0 {
  151. continue
  152. }
  153. err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  154. fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
  155. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  156. VolumeId: uint32(vid),
  157. Collection: collection,
  158. ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
  159. CopyEcxFile: false,
  160. CopyEcjFile: true,
  161. CopyVifFile: true,
  162. SourceDataNode: string(loc),
  163. })
  164. if copyErr != nil {
  165. return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
  166. }
  167. fmt.Printf("mount %d.%v on %s\n", vid, needToCopyEcIndexBits.ShardIds(), targetNodeLocation)
  168. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  169. VolumeId: uint32(vid),
  170. Collection: collection,
  171. ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
  172. })
  173. if mountErr != nil {
  174. return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), targetNodeLocation, mountErr)
  175. }
  176. return nil
  177. })
  178. if err != nil {
  179. break
  180. }
  181. copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits)
  182. }
  183. nodeToEcIndexBits[targetNodeLocation] = existingEcIndexBits.Plus(copiedEcIndexBits)
  184. return targetNodeLocation, err
  185. }
  186. func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
  187. var resp *master_pb.LookupVolumeResponse
  188. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  189. resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
  190. return err
  191. })
  192. if err != nil {
  193. return nil, err
  194. }
  195. return resp.VolumeIdLocations, nil
  196. }
  197. func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern string) (vids []needle.VolumeId, err error) {
  198. // compile regex pattern for collection matching
  199. collectionRegex, err := compileCollectionPattern(collectionPattern)
  200. if err != nil {
  201. return nil, fmt.Errorf("invalid collection pattern '%s': %v", collectionPattern, err)
  202. }
  203. vidMap := make(map[uint32]bool)
  204. eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  205. if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
  206. for _, v := range diskInfo.EcShardInfos {
  207. if collectionRegex.MatchString(v.Collection) {
  208. vidMap[v.Id] = true
  209. }
  210. }
  211. }
  212. })
  213. for vid := range vidMap {
  214. vids = append(vids, needle.VolumeId(vid))
  215. }
  216. return
  217. }
  218. func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
  219. nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
  220. eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  221. if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
  222. for _, v := range diskInfo.EcShardInfos {
  223. if v.Id == uint32(vid) {
  224. nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)
  225. }
  226. }
  227. }
  228. })
  229. return nodeToEcIndexBits
  230. }