command_ec_encode.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "sort"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  13. "google.golang.org/grpc"
  14. "github.com/seaweedfs/seaweedfs/weed/operation"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  19. )
  20. func init() {
  21. Commands = append(Commands, &commandEcEncode{})
  22. }
  23. type commandEcEncode struct {
  24. }
  25. func (c *commandEcEncode) Name() string {
  26. return "ec.encode"
  27. }
  28. func (c *commandEcEncode) Help() string {
  29. return `apply erasure coding to a volume
  30. ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose]
  31. ec.encode [-collection=""] [-volumeId=<volume_id>] [-verbose]
  32. This command will:
  33. 1. freeze one volume
  34. 2. apply erasure coding to the volume
  35. 3. (optionally) re-balance encoded shards across multiple volume servers
  36. The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
  37. to lose 4 volume servers.
  38. If the number of volumes are not high, the worst case is that you only have 4 volume servers,
  39. and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
  40. If you only have less than 4 volume servers, with erasure coding, at least you can afford to
  41. have 4 corrupted shard files.
  42. The -collection parameter supports regular expressions for pattern matching:
  43. - Use exact match: ec.encode -collection="^mybucket$"
  44. - Match multiple buckets: ec.encode -collection="bucket.*"
  45. - Match all collections: ec.encode -collection=".*"
  46. Options:
  47. -verbose: show detailed reasons why volumes are not selected for encoding
  48. Re-balancing algorithm:
  49. ` + ecBalanceAlgorithmDescription
  50. }
  51. func (c *commandEcEncode) HasTag(CommandTag) bool {
  52. return false
  53. }
  54. func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  55. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  56. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  57. collection := encodeCommand.String("collection", "", "the collection name")
  58. fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
  59. quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
  60. maxParallelization := encodeCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
  61. forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
  62. shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
  63. applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
  64. verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding")
  65. if err = encodeCommand.Parse(args); err != nil {
  66. return nil
  67. }
  68. if err = commandEnv.confirmIsLocked(args); err != nil {
  69. return
  70. }
  71. rp, err := parseReplicaPlacementArg(commandEnv, *shardReplicaPlacement)
  72. if err != nil {
  73. return err
  74. }
  75. // collect topology information
  76. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  77. if err != nil {
  78. return err
  79. }
  80. if !*forceChanges {
  81. var nodeCount int
  82. eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  83. nodeCount++
  84. })
  85. if nodeCount < erasure_coding.ParityShardsCount {
  86. glog.V(0).Infof("skip erasure coding with %d nodes, less than recommended %d nodes", nodeCount, erasure_coding.ParityShardsCount)
  87. return nil
  88. }
  89. }
  90. var volumeIds []needle.VolumeId
  91. var balanceCollections []string
  92. if vid := needle.VolumeId(*volumeId); vid != 0 {
  93. // volumeId is provided
  94. volumeIds = append(volumeIds, vid)
  95. balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
  96. } else {
  97. // apply to all volumes for the given collection pattern (regex)
  98. volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose)
  99. if err != nil {
  100. return err
  101. }
  102. }
  103. // Collect volume locations BEFORE EC encoding starts to avoid race condition
  104. // where the master metadata is updated after EC encoding but before deletion
  105. fmt.Printf("Collecting volume locations for %d volumes before EC encoding...\n", len(volumeIds))
  106. volumeLocationsMap, err := volumeLocations(commandEnv, volumeIds)
  107. if err != nil {
  108. return fmt.Errorf("failed to collect volume locations before EC encoding: %w", err)
  109. }
  110. // encode all requested volumes...
  111. if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
  112. return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err)
  113. }
  114. // ...re-balance ec shards...
  115. if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
  116. return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err)
  117. }
  118. // ...then delete original volumes using pre-collected locations.
  119. fmt.Printf("Deleting original volumes after EC encoding...\n")
  120. if err := doDeleteVolumesWithLocations(commandEnv, volumeIds, volumeLocationsMap, *maxParallelization); err != nil {
  121. return fmt.Errorf("delete original volumes after EC encoding: %w", err)
  122. }
  123. fmt.Printf("Successfully completed EC encoding for %d volumes\n", len(volumeIds))
  124. return nil
  125. }
  126. func volumeLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId) (map[needle.VolumeId][]wdclient.Location, error) {
  127. res := map[needle.VolumeId][]wdclient.Location{}
  128. for _, vid := range volumeIds {
  129. ls, ok := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
  130. if !ok {
  131. return nil, fmt.Errorf("volume %d not found", vid)
  132. }
  133. res[vid] = ls
  134. }
  135. return res, nil
  136. }
  137. func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
  138. if !commandEnv.isLocked() {
  139. return fmt.Errorf("lock is lost")
  140. }
  141. locations, err := volumeLocations(commandEnv, volumeIds)
  142. if err != nil {
  143. return fmt.Errorf("failed to get volume locations for EC encoding: %w", err)
  144. }
  145. // mark volumes as readonly
  146. ewg := NewErrorWaitGroup(maxParallelization)
  147. for _, vid := range volumeIds {
  148. for _, l := range locations[vid] {
  149. ewg.Add(func() error {
  150. if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, l, false, false); err != nil {
  151. return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, l.Url, err)
  152. }
  153. return nil
  154. })
  155. }
  156. }
  157. if err := ewg.Wait(); err != nil {
  158. return err
  159. }
  160. // generate ec shards
  161. ewg.Reset()
  162. for i, vid := range volumeIds {
  163. target := locations[vid][i%len(locations[vid])]
  164. ewg.Add(func() error {
  165. if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil {
  166. return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err)
  167. }
  168. return nil
  169. })
  170. }
  171. if err := ewg.Wait(); err != nil {
  172. return err
  173. }
  174. // mount all ec shards for the converted volume
  175. shardIds := make([]uint32, erasure_coding.TotalShardsCount)
  176. for i := range shardIds {
  177. shardIds[i] = uint32(i)
  178. }
  179. ewg.Reset()
  180. for i, vid := range volumeIds {
  181. target := locations[vid][i%len(locations[vid])]
  182. ewg.Add(func() error {
  183. if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil {
  184. return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err)
  185. }
  186. return nil
  187. })
  188. }
  189. if err := ewg.Wait(); err != nil {
  190. return err
  191. }
  192. return nil
  193. }
  194. // doDeleteVolumesWithLocations deletes volumes using pre-collected location information
  195. // This avoids race conditions where master metadata is updated after EC encoding
  196. func doDeleteVolumesWithLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId, volumeLocationsMap map[needle.VolumeId][]wdclient.Location, maxParallelization int) error {
  197. if !commandEnv.isLocked() {
  198. return fmt.Errorf("lock is lost")
  199. }
  200. ewg := NewErrorWaitGroup(maxParallelization)
  201. for _, vid := range volumeIds {
  202. locations, found := volumeLocationsMap[vid]
  203. if !found {
  204. fmt.Printf("warning: no locations found for volume %d, skipping deletion\n", vid)
  205. continue
  206. }
  207. for _, l := range locations {
  208. ewg.Add(func() error {
  209. if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
  210. return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
  211. }
  212. fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
  213. return nil
  214. })
  215. }
  216. }
  217. if err := ewg.Wait(); err != nil {
  218. return err
  219. }
  220. return nil
  221. }
  222. func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
  223. fmt.Printf("generateEcShards %d (collection %q) on %s ...\n", volumeId, collection, sourceVolumeServer)
  224. err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  225. _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
  226. VolumeId: uint32(volumeId),
  227. Collection: collection,
  228. })
  229. return genErr
  230. })
  231. return err
  232. }
  233. func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, collectionPattern string, sourceDiskType *types.DiskType, fullPercentage float64, quietPeriod time.Duration, verbose bool) (vids []needle.VolumeId, matchedCollections []string, err error) {
  234. // compile regex pattern for collection matching
  235. collectionRegex, err := compileCollectionPattern(collectionPattern)
  236. if err != nil {
  237. return nil, nil, fmt.Errorf("invalid collection pattern '%s': %v", collectionPattern, err)
  238. }
  239. // collect topology information
  240. topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
  241. if err != nil {
  242. return
  243. }
  244. quietSeconds := int64(quietPeriod / time.Second)
  245. nowUnixSeconds := time.Now().Unix()
  246. fmt.Printf("collect volumes with collection pattern '%s', quiet for: %d seconds and %.1f%% full\n", collectionPattern, quietSeconds, fullPercentage)
  247. // Statistics for verbose mode
  248. var (
  249. totalVolumes int
  250. remoteVolumes int
  251. wrongCollection int
  252. wrongDiskType int
  253. tooRecent int
  254. tooSmall int
  255. noFreeDisk int
  256. )
  257. vidMap := make(map[uint32]bool)
  258. collectionSet := make(map[string]bool)
  259. eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  260. for _, diskInfo := range dn.DiskInfos {
  261. for _, v := range diskInfo.VolumeInfos {
  262. totalVolumes++
  263. // ignore remote volumes
  264. if v.RemoteStorageName != "" && v.RemoteStorageKey != "" {
  265. remoteVolumes++
  266. if verbose {
  267. fmt.Printf("skip volume %d on %s: remote volume (storage: %s, key: %s)\n",
  268. v.Id, dn.Id, v.RemoteStorageName, v.RemoteStorageKey)
  269. }
  270. continue
  271. }
  272. // check collection against regex pattern
  273. if !collectionRegex.MatchString(v.Collection) {
  274. wrongCollection++
  275. if verbose {
  276. fmt.Printf("skip volume %d on %s: collection doesn't match pattern (pattern: %s, actual: %s)\n",
  277. v.Id, dn.Id, collectionPattern, v.Collection)
  278. }
  279. continue
  280. }
  281. // track matched collection
  282. collectionSet[v.Collection] = true
  283. // check disk type
  284. if sourceDiskType != nil && types.ToDiskType(v.DiskType) != *sourceDiskType {
  285. wrongDiskType++
  286. if verbose {
  287. fmt.Printf("skip volume %d on %s: wrong disk type (expected: %s, actual: %s)\n",
  288. v.Id, dn.Id, sourceDiskType.ReadableString(), types.ToDiskType(v.DiskType).ReadableString())
  289. }
  290. continue
  291. }
  292. // check quiet period
  293. if v.ModifiedAtSecond+quietSeconds >= nowUnixSeconds {
  294. tooRecent++
  295. if verbose {
  296. fmt.Printf("skip volume %d on %s: too recently modified (last modified: %d seconds ago, required: %d seconds)\n",
  297. v.Id, dn.Id, nowUnixSeconds-v.ModifiedAtSecond, quietSeconds)
  298. }
  299. continue
  300. }
  301. // check size
  302. sizeThreshold := fullPercentage / 100 * float64(volumeSizeLimitMb) * 1024 * 1024
  303. if float64(v.Size) <= sizeThreshold {
  304. tooSmall++
  305. if verbose {
  306. fmt.Printf("skip volume %d on %s: too small (size: %.1f MB, threshold: %.1f MB, %.1f%% full)\n",
  307. v.Id, dn.Id, float64(v.Size)/(1024*1024), sizeThreshold/(1024*1024),
  308. float64(v.Size)*100/(float64(volumeSizeLimitMb)*1024*1024))
  309. }
  310. continue
  311. }
  312. // check free disk space
  313. if good, found := vidMap[v.Id]; found {
  314. if good {
  315. if diskInfo.FreeVolumeCount < 2 {
  316. glog.V(0).Infof("skip %s %d on %s, no free disk", v.Collection, v.Id, dn.Id)
  317. if verbose {
  318. fmt.Printf("skip volume %d on %s: insufficient free disk space (free volumes: %d, required: 2)\n",
  319. v.Id, dn.Id, diskInfo.FreeVolumeCount)
  320. }
  321. vidMap[v.Id] = false
  322. noFreeDisk++
  323. }
  324. }
  325. } else {
  326. if diskInfo.FreeVolumeCount < 2 {
  327. glog.V(0).Infof("skip %s %d on %s, no free disk", v.Collection, v.Id, dn.Id)
  328. if verbose {
  329. fmt.Printf("skip volume %d on %s: insufficient free disk space (free volumes: %d, required: 2)\n",
  330. v.Id, dn.Id, diskInfo.FreeVolumeCount)
  331. }
  332. vidMap[v.Id] = false
  333. noFreeDisk++
  334. } else {
  335. if verbose {
  336. fmt.Printf("selected volume %d on %s: size %.1f MB (%.1f%% full), last modified %d seconds ago, free volumes: %d\n",
  337. v.Id, dn.Id, float64(v.Size)/(1024*1024),
  338. float64(v.Size)*100/(float64(volumeSizeLimitMb)*1024*1024),
  339. nowUnixSeconds-v.ModifiedAtSecond, diskInfo.FreeVolumeCount)
  340. }
  341. vidMap[v.Id] = true
  342. }
  343. }
  344. }
  345. }
  346. })
  347. for vid, good := range vidMap {
  348. if good {
  349. vids = append(vids, needle.VolumeId(vid))
  350. }
  351. }
  352. // Convert collection set to slice
  353. for collection := range collectionSet {
  354. matchedCollections = append(matchedCollections, collection)
  355. }
  356. sort.Strings(matchedCollections)
  357. // Print summary statistics in verbose mode or when no volumes selected
  358. if verbose || len(vids) == 0 {
  359. fmt.Printf("\nVolume selection summary:\n")
  360. fmt.Printf(" Total volumes examined: %d\n", totalVolumes)
  361. fmt.Printf(" Selected for encoding: %d\n", len(vids))
  362. fmt.Printf(" Collections matched: %v\n", matchedCollections)
  363. if totalVolumes > 0 {
  364. fmt.Printf("\nReasons for exclusion:\n")
  365. if remoteVolumes > 0 {
  366. fmt.Printf(" Remote volumes: %d\n", remoteVolumes)
  367. }
  368. if wrongCollection > 0 {
  369. fmt.Printf(" Collection doesn't match pattern: %d\n", wrongCollection)
  370. }
  371. if wrongDiskType > 0 {
  372. fmt.Printf(" Wrong disk type: %d\n", wrongDiskType)
  373. }
  374. if tooRecent > 0 {
  375. fmt.Printf(" Too recently modified: %d\n", tooRecent)
  376. }
  377. if tooSmall > 0 {
  378. fmt.Printf(" Too small (< %.1f%% full): %d\n", fullPercentage, tooSmall)
  379. }
  380. if noFreeDisk > 0 {
  381. fmt.Printf(" Insufficient free disk space: %d\n", noFreeDisk)
  382. }
  383. }
  384. fmt.Println()
  385. }
  386. return
  387. }