store_ec.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "slices"
  8. "sync"
  9. "time"
  10. "github.com/klauspost/reedsolomon"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/stats"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  19. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  20. )
  21. func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
  22. var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
  23. collectionEcShardSize := make(map[string]int64)
  24. for diskId, location := range s.Locations {
  25. location.ecVolumesLock.RLock()
  26. for _, ecShards := range location.ecVolumes {
  27. ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage(uint32(diskId))...)
  28. for _, ecShard := range ecShards.Shards {
  29. collectionEcShardSize[ecShards.Collection] += ecShard.Size()
  30. }
  31. }
  32. location.ecVolumesLock.RUnlock()
  33. }
  34. for col, size := range collectionEcShardSize {
  35. stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec").Set(float64(size))
  36. }
  37. return &master_pb.Heartbeat{
  38. EcShards: ecShardMessages,
  39. HasNoEcShards: len(ecShardMessages) == 0,
  40. }
  41. }
  42. func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  43. for diskId, location := range s.Locations {
  44. if ecVolume, err := location.LoadEcShard(collection, vid, shardId); err == nil {
  45. glog.V(0).Infof("MountEcShards %d.%d on disk ID %d", vid, shardId, diskId)
  46. var shardBits erasure_coding.ShardBits
  47. s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{
  48. Id: uint32(vid),
  49. Collection: collection,
  50. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  51. DiskType: string(location.DiskType),
  52. ExpireAtSec: ecVolume.ExpireAtSec,
  53. DiskId: uint32(diskId),
  54. }
  55. return nil
  56. } else if err == os.ErrNotExist {
  57. continue
  58. } else {
  59. return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
  60. }
  61. }
  62. return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
  63. }
  64. func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
  65. diskId, ecShard, found := s.findEcShard(vid, shardId)
  66. if !found {
  67. return nil
  68. }
  69. var shardBits erasure_coding.ShardBits
  70. message := master_pb.VolumeEcShardInformationMessage{
  71. Id: uint32(vid),
  72. Collection: ecShard.Collection,
  73. EcIndexBits: uint32(shardBits.AddShardId(shardId)),
  74. DiskType: string(ecShard.DiskType),
  75. DiskId: diskId,
  76. }
  77. location := s.Locations[diskId]
  78. if deleted := location.UnloadEcShard(vid, shardId); deleted {
  79. glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
  80. s.DeletedEcShardsChan <- message
  81. return nil
  82. }
  83. return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
  84. }
  85. func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (diskId uint32, shard *erasure_coding.EcVolumeShard, found bool) {
  86. for diskId, location := range s.Locations {
  87. if v, found := location.FindEcShard(vid, shardId); found {
  88. return uint32(diskId), v, found
  89. }
  90. }
  91. return 0, nil, false
  92. }
  93. func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
  94. for _, location := range s.Locations {
  95. if s, found := location.FindEcVolume(vid); found {
  96. return s, true
  97. }
  98. }
  99. return nil, false
  100. }
  101. // shardFiles is a list of shard files, which is used to return the shard locations
  102. func (s *Store) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) {
  103. for _, location := range s.Locations {
  104. if s, foundShards := location.CollectEcShards(vid, shardFileNames); foundShards {
  105. ecVolume = s
  106. found = true
  107. }
  108. }
  109. return
  110. }
  111. func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
  112. for _, location := range s.Locations {
  113. location.DestroyEcVolume(vid)
  114. }
  115. }
  116. func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadSizeFn func(size types.Size)) (int, error) {
  117. for _, location := range s.Locations {
  118. if localEcVolume, found := location.FindEcVolume(vid); found {
  119. offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, localEcVolume.Version)
  120. if err != nil {
  121. return 0, fmt.Errorf("locate in local ec volume: %w", err)
  122. }
  123. if size.IsDeleted() {
  124. return 0, ErrorDeleted
  125. }
  126. if onReadSizeFn != nil {
  127. onReadSizeFn(size)
  128. }
  129. glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals)
  130. if len(intervals) > 1 {
  131. glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
  132. }
  133. bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals)
  134. if err != nil {
  135. return 0, fmt.Errorf("ReadEcShardIntervals: %w", err)
  136. }
  137. if isDeleted {
  138. return 0, ErrorDeleted
  139. }
  140. err = n.ReadBytes(bytes, offset.ToActualOffset(), size, localEcVolume.Version)
  141. if err != nil {
  142. return 0, fmt.Errorf("readbytes: %w", err)
  143. }
  144. return len(bytes), nil
  145. }
  146. }
  147. return 0, fmt.Errorf("ec shard %d not found", vid)
  148. }
  149. func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
  150. if err = s.cachedLookupEcShardLocations(ecVolume); err != nil {
  151. return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
  152. }
  153. for i, interval := range intervals {
  154. if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil {
  155. return nil, isDeleted, e
  156. } else {
  157. if isDeleted {
  158. is_deleted = true
  159. }
  160. if i == 0 {
  161. data = d
  162. } else {
  163. data = append(data, d...)
  164. }
  165. }
  166. }
  167. return
  168. }
  169. func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
  170. shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  171. data = make([]byte, interval.Size)
  172. if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
  173. var readSize int
  174. if readSize, err = shard.ReadAt(data, actualOffset); err != nil {
  175. if readSize != int(interval.Size) {
  176. glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
  177. return
  178. }
  179. }
  180. } else {
  181. ecVolume.ShardLocationsLock.RLock()
  182. sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
  183. ecVolume.ShardLocationsLock.RUnlock()
  184. // try reading directly
  185. if hasShardIdLocation {
  186. _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
  187. if err == nil {
  188. return
  189. }
  190. glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
  191. }
  192. // try reading by recovering from other shards
  193. _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
  194. if err == nil {
  195. return
  196. }
  197. glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
  198. }
  199. return
  200. }
  201. func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) {
  202. // failed to access the source data nodes, clear it up
  203. ecVolume.ShardLocationsLock.Lock()
  204. delete(ecVolume.ShardLocations, shardId)
  205. ecVolume.ShardLocationsLock.Unlock()
  206. }
  207. func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) {
  208. shardCount := len(ecVolume.ShardLocations)
  209. if shardCount < erasure_coding.DataShardsCount &&
  210. ecVolume.ShardLocationsRefreshTime.Add(11*time.Second).After(time.Now()) ||
  211. shardCount == erasure_coding.TotalShardsCount &&
  212. ecVolume.ShardLocationsRefreshTime.Add(37*time.Minute).After(time.Now()) ||
  213. shardCount >= erasure_coding.DataShardsCount &&
  214. ecVolume.ShardLocationsRefreshTime.Add(7*time.Minute).After(time.Now()) {
  215. // still fresh
  216. return nil
  217. }
  218. glog.V(3).Infof("lookup and cache ec volume %d locations", ecVolume.VolumeId)
  219. err = operation.WithMasterServerClient(false, s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  220. req := &master_pb.LookupEcVolumeRequest{
  221. VolumeId: uint32(ecVolume.VolumeId),
  222. }
  223. resp, err := masterClient.LookupEcVolume(context.Background(), req)
  224. if err != nil {
  225. return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
  226. }
  227. if len(resp.ShardIdLocations) < erasure_coding.DataShardsCount {
  228. return fmt.Errorf("only %d shards found but %d required", len(resp.ShardIdLocations), erasure_coding.DataShardsCount)
  229. }
  230. ecVolume.ShardLocationsLock.Lock()
  231. for _, shardIdLocations := range resp.ShardIdLocations {
  232. shardId := erasure_coding.ShardId(shardIdLocations.ShardId)
  233. delete(ecVolume.ShardLocations, shardId)
  234. for _, loc := range shardIdLocations.Locations {
  235. ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], pb.NewServerAddressFromLocation(loc))
  236. }
  237. }
  238. ecVolume.ShardLocationsRefreshTime = time.Now()
  239. ecVolume.ShardLocationsLock.Unlock()
  240. return nil
  241. })
  242. return
  243. }
  244. func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  245. if len(sourceDataNodes) == 0 {
  246. return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
  247. }
  248. for _, sourceDataNode := range sourceDataNodes {
  249. glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
  250. n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset)
  251. if err == nil {
  252. return
  253. }
  254. glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  255. }
  256. return
  257. }
  258. func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  259. err = operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  260. // copy data slice
  261. shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{
  262. VolumeId: uint32(vid),
  263. ShardId: uint32(shardId),
  264. Offset: offset,
  265. Size: int64(len(buf)),
  266. FileKey: uint64(needleId),
  267. })
  268. if err != nil {
  269. return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  270. }
  271. for {
  272. resp, receiveErr := shardReadClient.Recv()
  273. if receiveErr == io.EOF {
  274. break
  275. }
  276. if receiveErr != nil {
  277. return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, receiveErr)
  278. }
  279. if resp.IsDeleted {
  280. is_deleted = true
  281. }
  282. copy(buf[n:n+len(resp.Data)], resp.Data)
  283. n += len(resp.Data)
  284. }
  285. return nil
  286. })
  287. if err != nil {
  288. return 0, is_deleted, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
  289. }
  290. return
  291. }
  292. func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
  293. glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  294. enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
  295. if err != nil {
  296. return 0, false, fmt.Errorf("failed to create encoder: %w", err)
  297. }
  298. bufs := make([][]byte, erasure_coding.TotalShardsCount)
  299. var wg sync.WaitGroup
  300. ecVolume.ShardLocationsLock.RLock()
  301. for shardId, locations := range ecVolume.ShardLocations {
  302. // skip current shard or empty shard
  303. if shardId == shardIdToRecover {
  304. continue
  305. }
  306. if len(locations) == 0 {
  307. glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
  308. continue
  309. }
  310. // read from remote locations
  311. wg.Add(1)
  312. go func(shardId erasure_coding.ShardId, locations []pb.ServerAddress) {
  313. defer wg.Done()
  314. data := make([]byte, len(buf))
  315. nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
  316. if readErr != nil {
  317. glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
  318. forgetShardId(ecVolume, shardId)
  319. }
  320. if isDeleted {
  321. is_deleted = true
  322. }
  323. if nRead == len(buf) {
  324. bufs[shardId] = data
  325. }
  326. }(shardId, locations)
  327. }
  328. ecVolume.ShardLocationsLock.RUnlock()
  329. wg.Wait()
  330. if err = enc.ReconstructData(bufs); err != nil {
  331. glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
  332. return 0, false, err
  333. }
  334. glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
  335. copy(buf, bufs[shardIdToRecover])
  336. return len(buf), is_deleted, nil
  337. }
  338. func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume) {
  339. for _, location := range s.Locations {
  340. location.ecVolumesLock.RLock()
  341. for _, v := range location.ecVolumes {
  342. ecVolumes = append(ecVolumes, v)
  343. }
  344. location.ecVolumesLock.RUnlock()
  345. }
  346. slices.SortFunc(ecVolumes, func(a, b *erasure_coding.EcVolume) int {
  347. return int(a.VolumeId) - int(b.VolumeId)
  348. })
  349. return ecVolumes
  350. }