ec_volume.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package erasure_coding
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "os"
  7. "slices"
  8. "sync"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
  18. )
  19. var (
  20. NotFoundError = errors.New("needle not found")
  21. destroyDelaySeconds int64 = 0
  22. )
  23. type EcVolume struct {
  24. VolumeId needle.VolumeId
  25. Collection string
  26. dir string
  27. dirIdx string
  28. ecxFile *os.File
  29. ecxFileSize int64
  30. ecxCreatedAt time.Time
  31. Shards []*EcVolumeShard
  32. ShardLocations map[ShardId][]pb.ServerAddress
  33. ShardLocationsRefreshTime time.Time
  34. ShardLocationsLock sync.RWMutex
  35. Version needle.Version
  36. ecjFile *os.File
  37. ecjFileAccessLock sync.Mutex
  38. diskType types.DiskType
  39. datFileSize int64
  40. ExpireAtSec uint64 //ec volume destroy time, calculated from the ec volume was created
  41. }
  42. func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
  43. ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
  44. dataBaseFileName := EcShardFileName(collection, dir, int(vid))
  45. indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
  46. // open ecx file
  47. if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
  48. return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
  49. }
  50. ecxFi, statErr := ev.ecxFile.Stat()
  51. if statErr != nil {
  52. _ = ev.ecxFile.Close()
  53. return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
  54. }
  55. ev.ecxFileSize = ecxFi.Size()
  56. ev.ecxCreatedAt = ecxFi.ModTime()
  57. // open ecj file
  58. if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
  59. return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
  60. }
  61. // read volume info
  62. ev.Version = needle.Version3
  63. if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
  64. ev.Version = needle.Version(volumeInfo.Version)
  65. ev.datFileSize = volumeInfo.DatFileSize
  66. ev.ExpireAtSec = volumeInfo.ExpireAtSec
  67. } else {
  68. glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName)
  69. volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
  70. }
  71. ev.ShardLocations = make(map[ShardId][]pb.ServerAddress)
  72. return
  73. }
  74. func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
  75. for _, s := range ev.Shards {
  76. if s.ShardId == ecVolumeShard.ShardId {
  77. return false
  78. }
  79. }
  80. ev.Shards = append(ev.Shards, ecVolumeShard)
  81. slices.SortFunc(ev.Shards, func(a, b *EcVolumeShard) int {
  82. if a.VolumeId != b.VolumeId {
  83. return int(a.VolumeId - b.VolumeId)
  84. }
  85. return int(a.ShardId - b.ShardId)
  86. })
  87. return true
  88. }
  89. func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
  90. foundPosition := -1
  91. for i, s := range ev.Shards {
  92. if s.ShardId == shardId {
  93. foundPosition = i
  94. }
  95. }
  96. if foundPosition < 0 {
  97. return nil, false
  98. }
  99. ecVolumeShard = ev.Shards[foundPosition]
  100. ecVolumeShard.Unmount()
  101. ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
  102. return ecVolumeShard, true
  103. }
  104. func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
  105. for _, s := range ev.Shards {
  106. if s.ShardId == shardId {
  107. return s, true
  108. }
  109. }
  110. return nil, false
  111. }
  112. func (ev *EcVolume) Close() {
  113. for _, s := range ev.Shards {
  114. s.Close()
  115. }
  116. if ev.ecjFile != nil {
  117. ev.ecjFileAccessLock.Lock()
  118. _ = ev.ecjFile.Close()
  119. ev.ecjFile = nil
  120. ev.ecjFileAccessLock.Unlock()
  121. }
  122. if ev.ecxFile != nil {
  123. _ = ev.ecxFile.Sync()
  124. _ = ev.ecxFile.Close()
  125. ev.ecxFile = nil
  126. }
  127. }
  128. func (ev *EcVolume) Destroy() {
  129. ev.Close()
  130. for _, s := range ev.Shards {
  131. s.Destroy()
  132. }
  133. os.Remove(ev.FileName(".ecx"))
  134. os.Remove(ev.FileName(".ecj"))
  135. os.Remove(ev.FileName(".vif"))
  136. }
  137. func (ev *EcVolume) FileName(ext string) string {
  138. switch ext {
  139. case ".ecx", ".ecj":
  140. return ev.IndexBaseFileName() + ext
  141. }
  142. // .vif
  143. return ev.DataBaseFileName() + ext
  144. }
  145. func (ev *EcVolume) DataBaseFileName() string {
  146. return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
  147. }
  148. func (ev *EcVolume) IndexBaseFileName() string {
  149. return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
  150. }
  151. func (ev *EcVolume) ShardSize() uint64 {
  152. if len(ev.Shards) > 0 {
  153. return uint64(ev.Shards[0].Size())
  154. }
  155. return 0
  156. }
  157. func (ev *EcVolume) Size() (size uint64) {
  158. for _, shard := range ev.Shards {
  159. if shardSize := shard.Size(); shardSize > 0 {
  160. size += uint64(shardSize)
  161. }
  162. }
  163. return
  164. }
  165. func (ev *EcVolume) CreatedAt() time.Time {
  166. return ev.ecxCreatedAt
  167. }
  168. func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
  169. for _, s := range ev.Shards {
  170. shardIds = append(shardIds, s.ShardId)
  171. }
  172. return
  173. }
  174. type ShardInfo struct {
  175. ShardId ShardId
  176. Size uint64
  177. }
  178. func (ev *EcVolume) ShardDetails() (shards []ShardInfo) {
  179. for _, s := range ev.Shards {
  180. shardSize := s.Size()
  181. if shardSize >= 0 {
  182. shards = append(shards, ShardInfo{
  183. ShardId: s.ShardId,
  184. Size: uint64(shardSize),
  185. })
  186. }
  187. }
  188. return
  189. }
  190. func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages []*master_pb.VolumeEcShardInformationMessage) {
  191. prevVolumeId := needle.VolumeId(math.MaxUint32)
  192. var m *master_pb.VolumeEcShardInformationMessage
  193. for _, s := range ev.Shards {
  194. if s.VolumeId != prevVolumeId {
  195. m = &master_pb.VolumeEcShardInformationMessage{
  196. Id: uint32(s.VolumeId),
  197. Collection: s.Collection,
  198. DiskType: string(ev.diskType),
  199. ExpireAtSec: ev.ExpireAtSec,
  200. DiskId: diskId,
  201. }
  202. messages = append(messages, m)
  203. }
  204. prevVolumeId = s.VolumeId
  205. m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
  206. // Add shard size information using the optimized format
  207. SetShardSize(m, s.ShardId, s.Size())
  208. }
  209. return
  210. }
  211. func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
  212. // find the needle from ecx file
  213. offset, size, err = ev.FindNeedleFromEcx(needleId)
  214. if err != nil {
  215. return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %w", err)
  216. }
  217. intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
  218. return
  219. }
  220. func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) {
  221. shard := ev.Shards[0]
  222. // Usually shard will be padded to round of ErasureCodingSmallBlockSize.
  223. // So in most cases, if shardSize equals to n * ErasureCodingLargeBlockSize,
  224. // the data would be in small blocks.
  225. shardSize := shard.ecdFileSize - 1
  226. if ev.datFileSize > 0 {
  227. // To get the correct LargeBlockRowsCount
  228. // use datFileSize to calculate the shardSize to match the EC encoding logic.
  229. shardSize = ev.datFileSize / DataShardsCount
  230. }
  231. // calculate the locations in the ec shards
  232. intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardSize, offset, types.Size(needle.GetActualSize(size, version)))
  233. return
  234. }
  235. func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
  236. return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
  237. }
  238. func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
  239. var key types.NeedleId
  240. buf := make([]byte, types.NeedleMapEntrySize)
  241. l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
  242. for l < h {
  243. m := (l + h) / 2
  244. if n, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
  245. if n != types.NeedleMapEntrySize {
  246. return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
  247. }
  248. }
  249. key, offset, size = idx.IdxFileEntry(buf)
  250. if key == needleId {
  251. if processNeedleFn != nil {
  252. err = processNeedleFn(ecxFile, m*types.NeedleMapEntrySize)
  253. }
  254. return
  255. }
  256. if key < needleId {
  257. l = m + 1
  258. } else {
  259. h = m
  260. }
  261. }
  262. err = NotFoundError
  263. return
  264. }
  265. func (ev *EcVolume) IsTimeToDestroy() bool {
  266. return ev.ExpireAtSec > 0 && time.Now().Unix() > (int64(ev.ExpireAtSec)+destroyDelaySeconds)
  267. }