command_volume_tier_upload.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "google.golang.org/grpc"
  11. "github.com/seaweedfs/seaweedfs/weed/operation"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  15. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  16. )
  17. func init() {
  18. Commands = append(Commands, &commandVolumeTierUpload{})
  19. }
  20. type commandVolumeTierUpload struct {
  21. }
  22. func (c *commandVolumeTierUpload) Name() string {
  23. return "volume.tier.upload"
  24. }
  25. func (c *commandVolumeTierUpload) Help() string {
  26. return `upload the dat file of a volume to a remote tier
  27. volume.tier.upload [-collection=""] [-fullPercent=95] [-quietFor=1h]
  28. volume.tier.upload [-collection=""] -volumeId=<volume_id> -dest=<storage_backend> [-keepLocalDatFile]
  29. e.g.:
  30. volume.tier.upload -volumeId=7 -dest=s3
  31. volume.tier.upload -volumeId=7 -dest=s3.default
  32. The <storage_backend> is defined in master.toml.
  33. For example, "s3.default" in [storage.backend.s3.default]
  34. This command will move the dat file of a volume to a remote tier.
  35. SeaweedFS enables scalable and fast local access to lots of files,
  36. and the cloud storage is slower by cost efficient. How to combine them together?
  37. Usually the data follows 80/20 rule: only 20% of data is frequently accessed.
  38. We can offload the old volumes to the cloud.
  39. With this, SeaweedFS can be both fast and scalable, and infinite storage space.
  40. Just add more local SeaweedFS volume servers to increase the throughput.
  41. The index file is still local, and the same O(1) disk read is applied to the remote file.
  42. `
  43. }
  44. func (c *commandVolumeTierUpload) HasTag(CommandTag) bool {
  45. return false
  46. }
  47. func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  48. tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  49. volumeId := tierCommand.Int("volumeId", 0, "the volume id")
  50. collection := tierCommand.String("collection", "", "the collection name")
  51. fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
  52. quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
  53. dest := tierCommand.String("dest", "", "the target tier name")
  54. keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file")
  55. disk := tierCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
  56. if err = tierCommand.Parse(args); err != nil {
  57. return nil
  58. }
  59. if err = commandEnv.confirmIsLocked(args); err != nil {
  60. return
  61. }
  62. vid := needle.VolumeId(*volumeId)
  63. // volumeId is provided
  64. if vid != 0 {
  65. return doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile)
  66. }
  67. var diskType *types.DiskType
  68. if disk != nil {
  69. _diskType := types.ToDiskType(*disk)
  70. diskType = &_diskType
  71. }
  72. // apply to all volumes in the collection
  73. // reusing collectVolumeIdsForEcEncode for now
  74. volumeIds, _, err := collectVolumeIdsForEcEncode(commandEnv, *collection, diskType, *fullPercentage, *quietPeriod, false)
  75. if err != nil {
  76. return err
  77. }
  78. fmt.Printf("tier upload volumes: %v\n", volumeIds)
  79. for _, vid := range volumeIds {
  80. if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil {
  81. return err
  82. }
  83. }
  84. return nil
  85. }
  86. func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {
  87. // find volume location
  88. topoInfo, _, err := collectTopologyInfo(commandEnv, 0)
  89. if err != nil {
  90. return fmt.Errorf("collect topology info: %v", err)
  91. }
  92. var existingLocations []wdclient.Location
  93. eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  94. for _, disk := range dn.DiskInfos {
  95. for _, vi := range disk.VolumeInfos {
  96. if needle.VolumeId(vi.Id) == vid && (collection == "" || vi.Collection == collection) {
  97. fmt.Printf("find volume %d from Url:%s, GrpcPort:%d, DC:%s\n", vid, dn.Id, dn.GrpcPort, string(dc))
  98. existingLocations = append(existingLocations, wdclient.Location{
  99. Url: dn.Id,
  100. PublicUrl: dn.Id,
  101. GrpcPort: int(dn.GrpcPort),
  102. DataCenter: string(dc),
  103. })
  104. }
  105. }
  106. }
  107. })
  108. if len(existingLocations) == 0 {
  109. if collection == "" {
  110. return fmt.Errorf("volume %d not found", vid)
  111. }
  112. return fmt.Errorf("volume %d not found in collection %s", vid, collection)
  113. }
  114. err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false, false)
  115. if err != nil {
  116. return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, existingLocations[0].Url, err)
  117. }
  118. // copy the .dat file to remote tier
  119. err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, vid, collection, existingLocations[0].ServerAddress(), dest, keepLocalDatFile)
  120. if err != nil {
  121. return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, existingLocations[0].Url, dest, err)
  122. }
  123. if keepLocalDatFile {
  124. return nil
  125. }
  126. // now the first replica has the .idx and .vif files.
  127. // ask replicas on other volume server to delete its own local copy
  128. for i, location := range existingLocations {
  129. if i == 0 {
  130. continue
  131. }
  132. fmt.Printf("delete volume %d from Url:%s\n", vid, location.Url)
  133. err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false)
  134. if err != nil {
  135. return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)
  136. }
  137. }
  138. return nil
  139. }
  140. func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error {
  141. err := operation.WithVolumeServerClient(true, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  142. stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
  143. VolumeId: uint32(volumeId),
  144. Collection: collection,
  145. DestinationBackendName: dest,
  146. KeepLocalDatFile: keepLocalDatFile,
  147. })
  148. if stream == nil {
  149. if copyErr == nil {
  150. // when the volume is already uploaded, VolumeTierMoveDatToRemote will return nil stream and nil error
  151. // so we should directly return in this caseAdd commentMore actions
  152. fmt.Fprintf(writer, "volume %v already uploaded", volumeId)
  153. return nil
  154. } else {
  155. return copyErr
  156. }
  157. }
  158. var lastProcessed int64
  159. for {
  160. resp, recvErr := stream.Recv()
  161. if recvErr != nil {
  162. if recvErr == io.EOF {
  163. break
  164. } else {
  165. return recvErr
  166. }
  167. }
  168. processingSpeed := float64(resp.Processed-lastProcessed) / 1024.0 / 1024.0
  169. fmt.Fprintf(writer, "copied %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed)
  170. lastProcessed = resp.Processed
  171. }
  172. return copyErr
  173. })
  174. return err
  175. }