volume_backup.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "google.golang.org/grpc"
  9. "github.com/seaweedfs/seaweedfs/weed/operation"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  14. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  15. )
  16. func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
  17. v.dataFileAccessLock.RLock()
  18. defer v.dataFileAccessLock.RUnlock()
  19. var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
  20. if datSize, _, err := v.DataBackend.GetStat(); err == nil {
  21. syncStatus.TailOffset = uint64(datSize)
  22. }
  23. syncStatus.VolumeId = uint32(v.Id)
  24. syncStatus.Collection = v.Collection
  25. syncStatus.IdxFileSize = v.nm.IndexFileSize()
  26. syncStatus.CompactRevision = uint32(v.SuperBlock.CompactionRevision)
  27. syncStatus.Ttl = v.SuperBlock.Ttl.String()
  28. syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
  29. syncStatus.Version = uint32(v.SuperBlock.Version)
  30. return syncStatus
  31. }
  32. // The volume sync with a master volume via 2 steps:
  33. // 1. The slave checks master side to find subscription checkpoint
  34. // to setup the replication.
  35. // 2. The slave receives the updates from master
  36. /*
  37. Assume the slave volume needs to follow the master volume.
  38. The master volume could be compacted, and could be many files ahead of
  39. slave volume.
  40. Step 0: // implemented in command/backup.go, to avoid dat file size overflow.
  41. 0.1 If slave compact version is less than the master, do a local compaction, and set
  42. local compact version the same as the master.
  43. 0.2 If the slave size is still bigger than the master, discard local copy and do a full copy.
  44. Step 1:
  45. The slave volume ask the master by the last modification time t.
  46. The master do a binary search in volume (use .idx as an array, and check the appendAtNs in .dat file),
  47. to find the first entry with appendAtNs > t.
  48. Step 2:
  49. The master send content bytes to the slave. The bytes are not chunked by needle.
  50. Step 3:
  51. The slave generate the needle map for the new bytes. (This may be optimized to incrementally
  52. update needle map when receiving new .dat bytes. But seems not necessary now.)
  53. */
  54. func (v *Volume) IncrementalBackup(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption) error {
  55. startFromOffset, _, _ := v.FileStat()
  56. appendAtNs, err := v.findLastAppendAtNs()
  57. if err != nil {
  58. return err
  59. }
  60. writeOffset := int64(startFromOffset)
  61. err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  62. stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{
  63. VolumeId: uint32(v.Id),
  64. SinceNs: appendAtNs,
  65. })
  66. if err != nil {
  67. return err
  68. }
  69. for {
  70. resp, recvErr := stream.Recv()
  71. if recvErr != nil {
  72. if recvErr == io.EOF {
  73. break
  74. } else {
  75. return recvErr
  76. }
  77. }
  78. n, writeErr := v.DataBackend.WriteAt(resp.FileContent, writeOffset)
  79. if writeErr != nil {
  80. return writeErr
  81. }
  82. writeOffset += int64(n)
  83. }
  84. return nil
  85. })
  86. if err != nil {
  87. return err
  88. }
  89. // add to needle map
  90. return ScanVolumeFileFrom(v.Version(), v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
  91. }
  92. func (v *Volume) findLastAppendAtNs() (uint64, error) {
  93. offset, err := v.locateLastAppendEntry()
  94. if err != nil {
  95. return 0, err
  96. }
  97. if offset.IsZero() {
  98. return 0, nil
  99. }
  100. return v.readAppendAtNs(offset)
  101. }
  102. func (v *Volume) locateLastAppendEntry() (Offset, error) {
  103. indexFile, e := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
  104. if e != nil {
  105. return Offset{}, fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), e)
  106. }
  107. defer indexFile.Close()
  108. fi, err := indexFile.Stat()
  109. if err != nil {
  110. return Offset{}, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err)
  111. }
  112. fileSize := fi.Size()
  113. if fileSize%NeedleMapEntrySize != 0 {
  114. return Offset{}, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
  115. }
  116. if fileSize == 0 {
  117. return Offset{}, nil
  118. }
  119. bytes := make([]byte, NeedleMapEntrySize)
  120. n, e := indexFile.ReadAt(bytes, fileSize-NeedleMapEntrySize)
  121. if n != NeedleMapEntrySize {
  122. return Offset{}, fmt.Errorf("file %s read error: %v", indexFile.Name(), e)
  123. }
  124. _, offset, _ := idx.IdxFileEntry(bytes)
  125. return offset, nil
  126. }
  127. func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
  128. n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset())
  129. if err != nil {
  130. return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToActualOffset(), offset.ToActualOffset()+NeedleHeaderSize, err)
  131. }
  132. _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()+NeedleHeaderSize, bodyLength)
  133. if err != nil {
  134. return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToActualOffset(), bodyLength, err)
  135. }
  136. return n.AppendAtNs, nil
  137. }
  138. // on server side
  139. func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
  140. fileSize := int64(v.IndexFileSize())
  141. if fileSize%NeedleMapEntrySize != 0 {
  142. err = fmt.Errorf("unexpected file %s.idx size: %d", v.IndexFileName(), fileSize)
  143. return
  144. }
  145. entryCount := fileSize / NeedleMapEntrySize
  146. l := int64(0)
  147. h := entryCount
  148. for l < h {
  149. m := (l + h) / 2
  150. if m == entryCount {
  151. return Offset{}, true, nil
  152. }
  153. // read the appendAtNs for entry m
  154. offset, err = v.readOffsetFromIndex(m)
  155. if err != nil {
  156. err = fmt.Errorf("read entry %d: %v", m, err)
  157. return
  158. }
  159. if offset.IsZero() {
  160. leftIndex, _, leftNs, leftErr := v.readLeftNs(m)
  161. if leftErr != nil {
  162. err = leftErr
  163. return
  164. }
  165. rightIndex, rightOffset, rightNs, rightErr := v.readRightNs(m, entryCount)
  166. if rightErr != nil {
  167. err = rightErr
  168. return
  169. }
  170. if rightNs <= sinceNs {
  171. l = rightIndex
  172. if l == entryCount {
  173. return Offset{}, true, nil
  174. } else {
  175. continue
  176. }
  177. }
  178. if sinceNs < leftNs {
  179. h = leftIndex + 1
  180. continue
  181. }
  182. return rightOffset, false, nil
  183. }
  184. if offset.IsZero() {
  185. return Offset{}, true, nil
  186. }
  187. mNs, nsReadErr := v.readAppendAtNs(offset)
  188. if nsReadErr != nil {
  189. err = fmt.Errorf("read entry %d offset %d: %v", m, offset.ToActualOffset(), nsReadErr)
  190. return
  191. }
  192. // move the boundary
  193. if mNs <= sinceNs {
  194. l = m + 1
  195. } else {
  196. h = m
  197. }
  198. }
  199. if l == entryCount {
  200. return Offset{}, true, nil
  201. }
  202. offset, err = v.readOffsetFromIndex(l)
  203. return offset, false, err
  204. }
  205. func (v *Volume) readRightNs(m, max int64) (index int64, offset Offset, ts uint64, err error) {
  206. index = m
  207. for offset.IsZero() {
  208. index++
  209. if index >= max {
  210. return
  211. }
  212. offset, err = v.readOffsetFromIndex(index)
  213. if err != nil {
  214. err = fmt.Errorf("read left entry at %d: %v", index, err)
  215. return
  216. }
  217. }
  218. if !offset.IsZero() {
  219. ts, err = v.readAppendAtNs(offset)
  220. }
  221. return
  222. }
  223. func (v *Volume) readLeftNs(m int64) (index int64, offset Offset, ts uint64, err error) {
  224. index = m
  225. for offset.IsZero() {
  226. index--
  227. if index < 0 {
  228. return
  229. }
  230. offset, err = v.readOffsetFromIndex(index)
  231. if err != nil {
  232. err = fmt.Errorf("read right entry at %d: %v", index, err)
  233. return
  234. }
  235. }
  236. if !offset.IsZero() {
  237. ts, err = v.readAppendAtNs(offset)
  238. }
  239. return
  240. }
  241. // bytes is of size NeedleMapEntrySize
  242. func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) {
  243. v.dataFileAccessLock.RLock()
  244. defer v.dataFileAccessLock.RUnlock()
  245. if v.nm == nil {
  246. return Offset{}, io.EOF
  247. }
  248. _, offset, _, err := v.nm.ReadIndexEntry(m)
  249. return offset, err
  250. }
  251. // generate the volume idx
  252. type VolumeFileScanner4GenIdx struct {
  253. v *Volume
  254. }
  255. func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock super_block.SuperBlock) error {
  256. return nil
  257. }
  258. func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
  259. return false
  260. }
  261. func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
  262. if n.Size > 0 && n.Size.IsValid() {
  263. return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
  264. }
  265. return scanner.v.nm.Delete(n.Id, ToOffset(offset))
  266. }