| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- package shell
- import (
- "context"
- "flag"
- "fmt"
- "io"
- "sort"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
- "google.golang.org/grpc"
- "github.com/seaweedfs/seaweedfs/weed/operation"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- )
- func init() {
- Commands = append(Commands, &commandEcEncode{})
- }
- type commandEcEncode struct {
- }
- func (c *commandEcEncode) Name() string {
- return "ec.encode"
- }
- func (c *commandEcEncode) Help() string {
- return `apply erasure coding to a volume
- ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h] [-verbose]
- ec.encode [-collection=""] [-volumeId=<volume_id>] [-verbose]
- This command will:
- 1. freeze one volume
- 2. apply erasure coding to the volume
- 3. (optionally) re-balance encoded shards across multiple volume servers
- The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
- to lose 4 volume servers.
- If the number of volumes are not high, the worst case is that you only have 4 volume servers,
- and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
- If you only have less than 4 volume servers, with erasure coding, at least you can afford to
- have 4 corrupted shard files.
- The -collection parameter supports regular expressions for pattern matching:
- - Use exact match: ec.encode -collection="^mybucket$"
- - Match multiple buckets: ec.encode -collection="bucket.*"
- - Match all collections: ec.encode -collection=".*"
- Options:
- -verbose: show detailed reasons why volumes are not selected for encoding
- Re-balancing algorithm:
- ` + ecBalanceAlgorithmDescription
- }
- func (c *commandEcEncode) HasTag(CommandTag) bool {
- return false
- }
- func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
- collection := encodeCommand.String("collection", "", "the collection name")
- fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
- quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
- maxParallelization := encodeCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
- forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
- shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
- applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
- verbose := encodeCommand.Bool("verbose", false, "show detailed reasons why volumes are not selected for encoding")
- if err = encodeCommand.Parse(args); err != nil {
- return nil
- }
- if err = commandEnv.confirmIsLocked(args); err != nil {
- return
- }
- rp, err := parseReplicaPlacementArg(commandEnv, *shardReplicaPlacement)
- if err != nil {
- return err
- }
- // collect topology information
- topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
- if err != nil {
- return err
- }
- if !*forceChanges {
- var nodeCount int
- eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
- nodeCount++
- })
- if nodeCount < erasure_coding.ParityShardsCount {
- glog.V(0).Infof("skip erasure coding with %d nodes, less than recommended %d nodes", nodeCount, erasure_coding.ParityShardsCount)
- return nil
- }
- }
- var volumeIds []needle.VolumeId
- var balanceCollections []string
- if vid := needle.VolumeId(*volumeId); vid != 0 {
- // volumeId is provided
- volumeIds = append(volumeIds, vid)
- balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
- } else {
- // apply to all volumes for the given collection pattern (regex)
- volumeIds, balanceCollections, err = collectVolumeIdsForEcEncode(commandEnv, *collection, nil, *fullPercentage, *quietPeriod, *verbose)
- if err != nil {
- return err
- }
- }
- // Collect volume locations BEFORE EC encoding starts to avoid race condition
- // where the master metadata is updated after EC encoding but before deletion
- fmt.Printf("Collecting volume locations for %d volumes before EC encoding...\n", len(volumeIds))
- volumeLocationsMap, err := volumeLocations(commandEnv, volumeIds)
- if err != nil {
- return fmt.Errorf("failed to collect volume locations before EC encoding: %w", err)
- }
- // encode all requested volumes...
- if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
- return fmt.Errorf("ec encode for volumes %v: %w", volumeIds, err)
- }
- // ...re-balance ec shards...
- if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
- return fmt.Errorf("re-balance ec shards for collection(s) %v: %w", balanceCollections, err)
- }
- // ...then delete original volumes using pre-collected locations.
- fmt.Printf("Deleting original volumes after EC encoding...\n")
- if err := doDeleteVolumesWithLocations(commandEnv, volumeIds, volumeLocationsMap, *maxParallelization); err != nil {
- return fmt.Errorf("delete original volumes after EC encoding: %w", err)
- }
- fmt.Printf("Successfully completed EC encoding for %d volumes\n", len(volumeIds))
- return nil
- }
- func volumeLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId) (map[needle.VolumeId][]wdclient.Location, error) {
- res := map[needle.VolumeId][]wdclient.Location{}
- for _, vid := range volumeIds {
- ls, ok := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
- if !ok {
- return nil, fmt.Errorf("volume %d not found", vid)
- }
- res[vid] = ls
- }
- return res, nil
- }
- func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
- if !commandEnv.isLocked() {
- return fmt.Errorf("lock is lost")
- }
- locations, err := volumeLocations(commandEnv, volumeIds)
- if err != nil {
- return fmt.Errorf("failed to get volume locations for EC encoding: %w", err)
- }
- // mark volumes as readonly
- ewg := NewErrorWaitGroup(maxParallelization)
- for _, vid := range volumeIds {
- for _, l := range locations[vid] {
- ewg.Add(func() error {
- if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, l, false, false); err != nil {
- return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, l.Url, err)
- }
- return nil
- })
- }
- }
- if err := ewg.Wait(); err != nil {
- return err
- }
- // generate ec shards
- ewg.Reset()
- for i, vid := range volumeIds {
- target := locations[vid][i%len(locations[vid])]
- ewg.Add(func() error {
- if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil {
- return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err)
- }
- return nil
- })
- }
- if err := ewg.Wait(); err != nil {
- return err
- }
- // mount all ec shards for the converted volume
- shardIds := make([]uint32, erasure_coding.TotalShardsCount)
- for i := range shardIds {
- shardIds[i] = uint32(i)
- }
- ewg.Reset()
- for i, vid := range volumeIds {
- target := locations[vid][i%len(locations[vid])]
- ewg.Add(func() error {
- if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil {
- return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err)
- }
- return nil
- })
- }
- if err := ewg.Wait(); err != nil {
- return err
- }
- return nil
- }
- // doDeleteVolumesWithLocations deletes volumes using pre-collected location information
- // This avoids race conditions where master metadata is updated after EC encoding
- func doDeleteVolumesWithLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId, volumeLocationsMap map[needle.VolumeId][]wdclient.Location, maxParallelization int) error {
- if !commandEnv.isLocked() {
- return fmt.Errorf("lock is lost")
- }
- ewg := NewErrorWaitGroup(maxParallelization)
- for _, vid := range volumeIds {
- locations, found := volumeLocationsMap[vid]
- if !found {
- fmt.Printf("warning: no locations found for volume %d, skipping deletion\n", vid)
- continue
- }
- for _, l := range locations {
- ewg.Add(func() error {
- if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
- return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
- }
- fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
- return nil
- })
- }
- }
- if err := ewg.Wait(); err != nil {
- return err
- }
- return nil
- }
- func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
- fmt.Printf("generateEcShards %d (collection %q) on %s ...\n", volumeId, collection, sourceVolumeServer)
- err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
- VolumeId: uint32(volumeId),
- Collection: collection,
- })
- return genErr
- })
- return err
- }
- func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, collectionPattern string, sourceDiskType *types.DiskType, fullPercentage float64, quietPeriod time.Duration, verbose bool) (vids []needle.VolumeId, matchedCollections []string, err error) {
- // compile regex pattern for collection matching
- collectionRegex, err := compileCollectionPattern(collectionPattern)
- if err != nil {
- return nil, nil, fmt.Errorf("invalid collection pattern '%s': %v", collectionPattern, err)
- }
- // collect topology information
- topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
- if err != nil {
- return
- }
- quietSeconds := int64(quietPeriod / time.Second)
- nowUnixSeconds := time.Now().Unix()
- fmt.Printf("collect volumes with collection pattern '%s', quiet for: %d seconds and %.1f%% full\n", collectionPattern, quietSeconds, fullPercentage)
- // Statistics for verbose mode
- var (
- totalVolumes int
- remoteVolumes int
- wrongCollection int
- wrongDiskType int
- tooRecent int
- tooSmall int
- noFreeDisk int
- )
- vidMap := make(map[uint32]bool)
- collectionSet := make(map[string]bool)
- eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
- for _, diskInfo := range dn.DiskInfos {
- for _, v := range diskInfo.VolumeInfos {
- totalVolumes++
- // ignore remote volumes
- if v.RemoteStorageName != "" && v.RemoteStorageKey != "" {
- remoteVolumes++
- if verbose {
- fmt.Printf("skip volume %d on %s: remote volume (storage: %s, key: %s)\n",
- v.Id, dn.Id, v.RemoteStorageName, v.RemoteStorageKey)
- }
- continue
- }
- // check collection against regex pattern
- if !collectionRegex.MatchString(v.Collection) {
- wrongCollection++
- if verbose {
- fmt.Printf("skip volume %d on %s: collection doesn't match pattern (pattern: %s, actual: %s)\n",
- v.Id, dn.Id, collectionPattern, v.Collection)
- }
- continue
- }
- // track matched collection
- collectionSet[v.Collection] = true
- // check disk type
- if sourceDiskType != nil && types.ToDiskType(v.DiskType) != *sourceDiskType {
- wrongDiskType++
- if verbose {
- fmt.Printf("skip volume %d on %s: wrong disk type (expected: %s, actual: %s)\n",
- v.Id, dn.Id, sourceDiskType.ReadableString(), types.ToDiskType(v.DiskType).ReadableString())
- }
- continue
- }
- // check quiet period
- if v.ModifiedAtSecond+quietSeconds >= nowUnixSeconds {
- tooRecent++
- if verbose {
- fmt.Printf("skip volume %d on %s: too recently modified (last modified: %d seconds ago, required: %d seconds)\n",
- v.Id, dn.Id, nowUnixSeconds-v.ModifiedAtSecond, quietSeconds)
- }
- continue
- }
- // check size
- sizeThreshold := fullPercentage / 100 * float64(volumeSizeLimitMb) * 1024 * 1024
- if float64(v.Size) <= sizeThreshold {
- tooSmall++
- if verbose {
- fmt.Printf("skip volume %d on %s: too small (size: %.1f MB, threshold: %.1f MB, %.1f%% full)\n",
- v.Id, dn.Id, float64(v.Size)/(1024*1024), sizeThreshold/(1024*1024),
- float64(v.Size)*100/(float64(volumeSizeLimitMb)*1024*1024))
- }
- continue
- }
- // check free disk space
- if good, found := vidMap[v.Id]; found {
- if good {
- if diskInfo.FreeVolumeCount < 2 {
- glog.V(0).Infof("skip %s %d on %s, no free disk", v.Collection, v.Id, dn.Id)
- if verbose {
- fmt.Printf("skip volume %d on %s: insufficient free disk space (free volumes: %d, required: 2)\n",
- v.Id, dn.Id, diskInfo.FreeVolumeCount)
- }
- vidMap[v.Id] = false
- noFreeDisk++
- }
- }
- } else {
- if diskInfo.FreeVolumeCount < 2 {
- glog.V(0).Infof("skip %s %d on %s, no free disk", v.Collection, v.Id, dn.Id)
- if verbose {
- fmt.Printf("skip volume %d on %s: insufficient free disk space (free volumes: %d, required: 2)\n",
- v.Id, dn.Id, diskInfo.FreeVolumeCount)
- }
- vidMap[v.Id] = false
- noFreeDisk++
- } else {
- if verbose {
- fmt.Printf("selected volume %d on %s: size %.1f MB (%.1f%% full), last modified %d seconds ago, free volumes: %d\n",
- v.Id, dn.Id, float64(v.Size)/(1024*1024),
- float64(v.Size)*100/(float64(volumeSizeLimitMb)*1024*1024),
- nowUnixSeconds-v.ModifiedAtSecond, diskInfo.FreeVolumeCount)
- }
- vidMap[v.Id] = true
- }
- }
- }
- }
- })
- for vid, good := range vidMap {
- if good {
- vids = append(vids, needle.VolumeId(vid))
- }
- }
- // Convert collection set to slice
- for collection := range collectionSet {
- matchedCollections = append(matchedCollections, collection)
- }
- sort.Strings(matchedCollections)
- // Print summary statistics in verbose mode or when no volumes selected
- if verbose || len(vids) == 0 {
- fmt.Printf("\nVolume selection summary:\n")
- fmt.Printf(" Total volumes examined: %d\n", totalVolumes)
- fmt.Printf(" Selected for encoding: %d\n", len(vids))
- fmt.Printf(" Collections matched: %v\n", matchedCollections)
- if totalVolumes > 0 {
- fmt.Printf("\nReasons for exclusion:\n")
- if remoteVolumes > 0 {
- fmt.Printf(" Remote volumes: %d\n", remoteVolumes)
- }
- if wrongCollection > 0 {
- fmt.Printf(" Collection doesn't match pattern: %d\n", wrongCollection)
- }
- if wrongDiskType > 0 {
- fmt.Printf(" Wrong disk type: %d\n", wrongDiskType)
- }
- if tooRecent > 0 {
- fmt.Printf(" Too recently modified: %d\n", tooRecent)
- }
- if tooSmall > 0 {
- fmt.Printf(" Too small (< %.1f%% full): %d\n", fullPercentage, tooSmall)
- }
- if noFreeDisk > 0 {
- fmt.Printf(" Insufficient free disk space: %d\n", noFreeDisk)
- }
- }
- fmt.Println()
- }
- return
- }
|