volume_grpc_erasure_coding.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/storage"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
  20. "github.com/seaweedfs/seaweedfs/weed/util"
  21. )
  22. /*
  23. Steps to apply erasure coding to .dat .idx files
  24. 0. ensure the volume is readonly
  25. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
  26. 2. client ask master for possible servers to hold the ec files
  27. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  28. 4. target servers report the new ec files to the master
  29. 5. master stores vid -> [14]*DataNode
  30. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  31. */
  32. // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
  33. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  34. glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
  35. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  36. if v == nil {
  37. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  38. }
  39. baseFileName := v.DataFileName()
  40. if v.Collection != req.Collection {
  41. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  42. }
  43. shouldCleanup := true
  44. defer func() {
  45. if !shouldCleanup {
  46. return
  47. }
  48. for i := 0; i < erasure_coding.TotalShardsCount; i++ {
  49. os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i))
  50. }
  51. os.Remove(v.IndexFileName() + ".ecx")
  52. }()
  53. // write .ec00 ~ .ec13 files
  54. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  55. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  56. }
  57. // write .ecx file
  58. if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
  59. return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
  60. }
  61. // write .vif files
  62. var expireAtSec uint64
  63. if v.Ttl != nil {
  64. ttlSecond := v.Ttl.ToSeconds()
  65. if ttlSecond > 0 {
  66. expireAtSec = uint64(time.Now().Unix()) + ttlSecond //calculated expiration time
  67. }
  68. }
  69. volumeInfo := &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}
  70. volumeInfo.ExpireAtSec = expireAtSec
  71. datSize, _, _ := v.FileStat()
  72. volumeInfo.DatFileSize = int64(datSize)
  73. if err := volume_info.SaveVolumeInfo(baseFileName+".vif", volumeInfo); err != nil {
  74. return nil, fmt.Errorf("SaveVolumeInfo %s: %v", baseFileName, err)
  75. }
  76. shouldCleanup = false
  77. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  78. }
  79. // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
  80. func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
  81. glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
  82. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  83. var rebuiltShardIds []uint32
  84. for _, location := range vs.store.Locations {
  85. _, _, existingShardCount, err := checkEcVolumeStatus(baseFileName, location)
  86. if err != nil {
  87. return nil, err
  88. }
  89. if existingShardCount == 0 {
  90. continue
  91. }
  92. if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) {
  93. // write .ec00 ~ .ec13 files
  94. dataBaseFileName := path.Join(location.Directory, baseFileName)
  95. if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil {
  96. return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err)
  97. } else {
  98. rebuiltShardIds = generatedShardIds
  99. }
  100. indexBaseFileName := path.Join(location.IdxDirectory, baseFileName)
  101. if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil {
  102. return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err)
  103. }
  104. break
  105. }
  106. }
  107. return &volume_server_pb.VolumeEcShardsRebuildResponse{
  108. RebuiltShardIds: rebuiltShardIds,
  109. }, nil
  110. }
  111. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  112. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  113. glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
  114. var location *storage.DiskLocation
  115. // Use disk_id if provided (disk-aware storage)
  116. if req.DiskId > 0 || (req.DiskId == 0 && len(vs.store.Locations) > 0) {
  117. // Validate disk ID is within bounds
  118. if int(req.DiskId) >= len(vs.store.Locations) {
  119. return nil, fmt.Errorf("invalid disk_id %d: only have %d disks", req.DiskId, len(vs.store.Locations))
  120. }
  121. // Use the specific disk location
  122. location = vs.store.Locations[req.DiskId]
  123. glog.V(1).Infof("Using disk %d for EC shard copy: %s", req.DiskId, location.Directory)
  124. } else {
  125. // Fallback to old behavior for backward compatibility
  126. if req.CopyEcxFile {
  127. location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
  128. return location.DiskType == types.HardDriveType
  129. })
  130. } else {
  131. location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
  132. return true
  133. })
  134. }
  135. if location == nil {
  136. return nil, fmt.Errorf("no space left")
  137. }
  138. }
  139. dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
  140. indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
  141. err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  142. // copy ec data slices
  143. for _, shardId := range req.ShardIds {
  144. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil); err != nil {
  145. return err
  146. }
  147. }
  148. if req.CopyEcxFile {
  149. // copy ecx file
  150. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil {
  151. return err
  152. }
  153. }
  154. if req.CopyEcjFile {
  155. // copy ecj file
  156. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil); err != nil {
  157. return err
  158. }
  159. }
  160. if req.CopyVifFile {
  161. // copy vif file
  162. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil); err != nil {
  163. return err
  164. }
  165. }
  166. return nil
  167. })
  168. if err != nil {
  169. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  170. }
  171. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  172. }
  173. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
  174. // the shard should not be mounted before calling this.
  175. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  176. bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  177. glog.V(0).Infof("ec volume %s shard delete %v", bName, req.ShardIds)
  178. for _, location := range vs.store.Locations {
  179. if err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil {
  180. glog.Errorf("deleteEcShards from %s %s.%v: %v", location.Directory, bName, req.ShardIds, err)
  181. return nil, err
  182. }
  183. }
  184. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
  185. }
  186. func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocation, shardIds []uint32) error {
  187. found := false
  188. indexBaseFilename := path.Join(location.IdxDirectory, bName)
  189. dataBaseFilename := path.Join(location.Directory, bName)
  190. if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
  191. for _, shardId := range shardIds {
  192. shardFileName := dataBaseFilename + erasure_coding.ToExt(int(shardId))
  193. if util.FileExists(shardFileName) {
  194. found = true
  195. os.Remove(shardFileName)
  196. }
  197. }
  198. }
  199. if !found {
  200. return nil
  201. }
  202. hasEcxFile, hasIdxFile, existingShardCount, err := checkEcVolumeStatus(bName, location)
  203. if err != nil {
  204. return err
  205. }
  206. if hasEcxFile && existingShardCount == 0 {
  207. if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
  208. return err
  209. }
  210. os.Remove(indexBaseFilename + ".ecj")
  211. if !hasIdxFile {
  212. // .vif is used for ec volumes and normal volumes
  213. os.Remove(dataBaseFilename + ".vif")
  214. }
  215. }
  216. return nil
  217. }
  218. func checkEcVolumeStatus(bName string, location *storage.DiskLocation) (hasEcxFile bool, hasIdxFile bool, existingShardCount int, err error) {
  219. // check whether to delete the .ecx and .ecj file also
  220. fileInfos, err := os.ReadDir(location.Directory)
  221. if err != nil {
  222. return false, false, 0, err
  223. }
  224. if location.IdxDirectory != location.Directory {
  225. idxFileInfos, err := os.ReadDir(location.IdxDirectory)
  226. if err != nil {
  227. return false, false, 0, err
  228. }
  229. fileInfos = append(fileInfos, idxFileInfos...)
  230. }
  231. for _, fileInfo := range fileInfos {
  232. if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
  233. hasEcxFile = true
  234. continue
  235. }
  236. if fileInfo.Name() == bName+".idx" {
  237. hasIdxFile = true
  238. continue
  239. }
  240. if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
  241. existingShardCount++
  242. }
  243. }
  244. return hasEcxFile, hasIdxFile, existingShardCount, nil
  245. }
  246. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  247. glog.V(0).Infof("VolumeEcShardsMount: %v", req)
  248. for _, shardId := range req.ShardIds {
  249. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  250. if err != nil {
  251. glog.Errorf("ec shard mount %v: %v", req, err)
  252. } else {
  253. glog.V(2).Infof("ec shard mount %v", req)
  254. }
  255. if err != nil {
  256. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  257. }
  258. }
  259. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  260. }
  261. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  262. glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
  263. for _, shardId := range req.ShardIds {
  264. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  265. if err != nil {
  266. glog.Errorf("ec shard unmount %v: %v", req, err)
  267. } else {
  268. glog.V(2).Infof("ec shard unmount %v", req)
  269. }
  270. if err != nil {
  271. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  272. }
  273. }
  274. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  275. }
  276. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  277. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  278. if !found {
  279. return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
  280. }
  281. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  282. if !found {
  283. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  284. }
  285. if req.FileKey != 0 {
  286. _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
  287. if size.IsDeleted() {
  288. return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  289. IsDeleted: true,
  290. })
  291. }
  292. }
  293. bufSize := req.Size
  294. if bufSize > BufferSizeLimit {
  295. bufSize = BufferSizeLimit
  296. }
  297. buffer := make([]byte, bufSize)
  298. startOffset, bytesToRead := req.Offset, req.Size
  299. for bytesToRead > 0 {
  300. // min of bytesToRead and bufSize
  301. bufferSize := bufSize
  302. if bufferSize > bytesToRead {
  303. bufferSize = bytesToRead
  304. }
  305. bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
  306. // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
  307. if bytesread > 0 {
  308. if int64(bytesread) > bytesToRead {
  309. bytesread = int(bytesToRead)
  310. }
  311. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  312. Data: buffer[:bytesread],
  313. })
  314. if err != nil {
  315. // println("sending", bytesread, "bytes err", err.Error())
  316. return err
  317. }
  318. startOffset += int64(bytesread)
  319. bytesToRead -= int64(bytesread)
  320. }
  321. if err != nil {
  322. if err != io.EOF {
  323. return err
  324. }
  325. return nil
  326. }
  327. }
  328. return nil
  329. }
  330. func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
  331. glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
  332. resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
  333. for _, location := range vs.store.Locations {
  334. if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
  335. _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
  336. if err != nil {
  337. return nil, fmt.Errorf("locate in local ec volume: %w", err)
  338. }
  339. if size.IsDeleted() {
  340. return resp, nil
  341. }
  342. err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
  343. if err != nil {
  344. return nil, err
  345. }
  346. break
  347. }
  348. }
  349. return resp, nil
  350. }
  351. // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
  352. func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
  353. glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
  354. // collect .ec00 ~ .ec09 files
  355. shardFileNames := make([]string, erasure_coding.DataShardsCount)
  356. v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames)
  357. if !found {
  358. return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
  359. }
  360. if v.Collection != req.Collection {
  361. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  362. }
  363. for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ {
  364. if shardFileNames[shardId] == "" {
  365. return nil, fmt.Errorf("ec volume %d missing shard %d", req.VolumeId, shardId)
  366. }
  367. }
  368. dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
  369. // calculate .dat file size
  370. datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
  371. if err != nil {
  372. return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err)
  373. }
  374. // write .dat file from .ec00 ~ .ec09 files
  375. if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize, shardFileNames); err != nil {
  376. return nil, fmt.Errorf("WriteDatFile %s: %v", dataBaseFileName, err)
  377. }
  378. // write .idx file from .ecx and .ecj files
  379. if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil {
  380. return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
  381. }
  382. return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
  383. }
  384. func (vs *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) {
  385. glog.V(0).Infof("VolumeEcShardsInfo: volume %d", req.VolumeId)
  386. var ecShardInfos []*volume_server_pb.EcShardInfo
  387. // Find the EC volume
  388. for _, location := range vs.store.Locations {
  389. if v, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
  390. // Get shard details from the EC volume
  391. shardDetails := v.ShardDetails()
  392. for _, shardDetail := range shardDetails {
  393. ecShardInfo := &volume_server_pb.EcShardInfo{
  394. ShardId: uint32(shardDetail.ShardId),
  395. Size: int64(shardDetail.Size),
  396. Collection: v.Collection,
  397. }
  398. ecShardInfos = append(ecShardInfos, ecShardInfo)
  399. }
  400. break
  401. }
  402. }
  403. return &volume_server_pb.VolumeEcShardsInfoResponse{
  404. EcShardInfos: ecShardInfos,
  405. }, nil
  406. }