filechunks.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. package filer
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  7. "math"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. )
  11. func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
  12. for _, c := range chunks {
  13. t := uint64(c.Offset + int64(c.Size))
  14. if size < t {
  15. size = t
  16. }
  17. }
  18. return
  19. }
  20. func FileSize(entry *filer_pb.Entry) (size uint64) {
  21. if entry == nil || entry.Attributes == nil {
  22. return 0
  23. }
  24. fileSize := entry.Attributes.FileSize
  25. if entry.RemoteEntry != nil {
  26. if entry.RemoteEntry.RemoteMtime > entry.Attributes.Mtime {
  27. fileSize = maxUint64(fileSize, uint64(entry.RemoteEntry.RemoteSize))
  28. }
  29. }
  30. return maxUint64(TotalSize(entry.GetChunks()), fileSize)
  31. }
  32. func ETag(entry *filer_pb.Entry) (etag string) {
  33. if entry.Attributes == nil || entry.Attributes.Md5 == nil {
  34. return ETagChunks(entry.GetChunks())
  35. }
  36. return fmt.Sprintf("%x", entry.Attributes.Md5)
  37. }
  38. func ETagEntry(entry *Entry) (etag string) {
  39. if entry.IsInRemoteOnly() {
  40. return entry.Remote.RemoteETag
  41. }
  42. if entry.Attr.Md5 == nil {
  43. return ETagChunks(entry.GetChunks())
  44. }
  45. return fmt.Sprintf("%x", entry.Attr.Md5)
  46. }
  47. func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
  48. if len(chunks) == 1 {
  49. return fmt.Sprintf("%x", util.Base64Md5ToBytes(chunks[0].ETag))
  50. }
  51. var md5Digests [][]byte
  52. for _, c := range chunks {
  53. md5Digests = append(md5Digests, util.Base64Md5ToBytes(c.ETag))
  54. }
  55. return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks))
  56. }
  57. func CompactFileChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
  58. visibles, _ := NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, chunks, 0, math.MaxInt64)
  59. compacted, garbage = SeparateGarbageChunks(visibles, chunks)
  60. return
  61. }
  62. func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*filer_pb.FileChunk) (compacted []*filer_pb.FileChunk, garbage []*filer_pb.FileChunk) {
  63. fileIds := make(map[string]bool)
  64. for x := visibles.Front(); x != nil; x = x.Next {
  65. interval := x.Value
  66. fileIds[interval.fileId] = true
  67. }
  68. for _, chunk := range chunks {
  69. if _, found := fileIds[chunk.GetFileIdString()]; found {
  70. compacted = append(compacted, chunk)
  71. } else {
  72. garbage = append(garbage, chunk)
  73. }
  74. }
  75. return compacted, garbage
  76. }
  77. func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, stop int64) (garbageFileIds map[string]struct{}) {
  78. garbageFileIds = make(map[string]struct{})
  79. for x := visibles.Front(); x != nil; x = x.Next {
  80. interval := x.Value
  81. offset := interval.start - interval.offsetInChunk
  82. if start <= offset && offset+int64(interval.chunkSize) <= stop {
  83. garbageFileIds[interval.fileId] = struct{}{}
  84. }
  85. }
  86. return
  87. }
  88. func MinusChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
  89. aData, aMeta, aErr := ResolveChunkManifest(ctx, lookupFileIdFn, as, 0, math.MaxInt64)
  90. if aErr != nil {
  91. return nil, aErr
  92. }
  93. bData, bMeta, bErr := ResolveChunkManifest(ctx, lookupFileIdFn, bs, 0, math.MaxInt64)
  94. if bErr != nil {
  95. return nil, bErr
  96. }
  97. delta = append(delta, DoMinusChunks(aData, bData)...)
  98. delta = append(delta, DoMinusChunks(aMeta, bMeta)...)
  99. return
  100. }
  101. func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
  102. fileIds := make(map[string]bool)
  103. for _, interval := range bs {
  104. fileIds[interval.GetFileIdString()] = true
  105. }
  106. for _, chunk := range as {
  107. if _, found := fileIds[chunk.GetFileIdString()]; !found {
  108. delta = append(delta, chunk)
  109. }
  110. }
  111. return
  112. }
  113. func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
  114. fileIds := make(map[string]bool)
  115. for _, interval := range bs {
  116. fileIds[interval.GetFileIdString()] = true
  117. fileIds[interval.GetSourceFileId()] = true
  118. }
  119. for _, chunk := range as {
  120. _, sourceFileIdFound := fileIds[chunk.GetSourceFileId()]
  121. _, fileIdFound := fileIds[chunk.GetFileId()]
  122. if !sourceFileIdFound && !fileIdFound {
  123. delta = append(delta, chunk)
  124. }
  125. }
  126. return
  127. }
  128. type ChunkView struct {
  129. FileId string
  130. OffsetInChunk int64 // offset within the chunk
  131. ViewSize uint64
  132. ViewOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
  133. ChunkSize uint64
  134. CipherKey []byte
  135. IsGzipped bool
  136. ModifiedTsNs int64
  137. }
  138. func (cv *ChunkView) SetStartStop(start, stop int64) {
  139. cv.OffsetInChunk += start - cv.ViewOffset
  140. cv.ViewOffset = start
  141. cv.ViewSize = uint64(stop - start)
  142. }
  143. func (cv *ChunkView) Clone() IntervalValue {
  144. return &ChunkView{
  145. FileId: cv.FileId,
  146. OffsetInChunk: cv.OffsetInChunk,
  147. ViewSize: cv.ViewSize,
  148. ViewOffset: cv.ViewOffset,
  149. ChunkSize: cv.ChunkSize,
  150. CipherKey: cv.CipherKey,
  151. IsGzipped: cv.IsGzipped,
  152. ModifiedTsNs: cv.ModifiedTsNs,
  153. }
  154. }
  155. func (cv *ChunkView) IsFullChunk() bool {
  156. // IsFullChunk returns true if the view covers the entire chunk from the beginning.
  157. // This prevents bandwidth amplification when range requests happen to align
  158. // with chunk boundaries but don't actually want the full chunk.
  159. return cv.OffsetInChunk == 0 && cv.ViewSize == cv.ChunkSize
  160. }
  161. func ViewFromChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
  162. visibles, _ := NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, chunks, offset, offset+size)
  163. return ViewFromVisibleIntervals(visibles, offset, size)
  164. }
  165. func ViewFromVisibleIntervals(visibles *IntervalList[*VisibleInterval], offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
  166. stop := offset + size
  167. if size == math.MaxInt64 {
  168. stop = math.MaxInt64
  169. }
  170. if stop < offset {
  171. stop = math.MaxInt64
  172. }
  173. chunkViews = NewIntervalList[*ChunkView]()
  174. for x := visibles.Front(); x != nil; x = x.Next {
  175. chunk := x.Value
  176. chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop)
  177. if chunkStart < chunkStop {
  178. chunkView := &ChunkView{
  179. FileId: chunk.fileId,
  180. OffsetInChunk: chunkStart - chunk.start + chunk.offsetInChunk,
  181. ViewSize: uint64(chunkStop - chunkStart),
  182. ViewOffset: chunkStart,
  183. ChunkSize: chunk.chunkSize,
  184. CipherKey: chunk.cipherKey,
  185. IsGzipped: chunk.isGzipped,
  186. ModifiedTsNs: chunk.modifiedTsNs,
  187. }
  188. chunkViews.AppendInterval(&Interval[*ChunkView]{
  189. StartOffset: chunkStart,
  190. StopOffset: chunkStop,
  191. TsNs: chunk.modifiedTsNs,
  192. Value: chunkView,
  193. Prev: nil,
  194. Next: nil,
  195. })
  196. }
  197. }
  198. return chunkViews
  199. }
  200. func MergeIntoVisibles(visibles *IntervalList[*VisibleInterval], start int64, stop int64, chunk *filer_pb.FileChunk) {
  201. newV := &VisibleInterval{
  202. start: start,
  203. stop: stop,
  204. fileId: chunk.GetFileIdString(),
  205. modifiedTsNs: chunk.ModifiedTsNs,
  206. offsetInChunk: start - chunk.Offset, // the starting position in the chunk
  207. chunkSize: chunk.Size, // size of the chunk
  208. cipherKey: chunk.CipherKey,
  209. isGzipped: chunk.IsCompressed,
  210. }
  211. visibles.InsertInterval(start, stop, chunk.ModifiedTsNs, newV)
  212. }
  213. func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop int64, chunk *filer_pb.FileChunk) {
  214. chunkView := &ChunkView{
  215. FileId: chunk.GetFileIdString(),
  216. OffsetInChunk: start - chunk.Offset,
  217. ViewSize: uint64(stop - start),
  218. ViewOffset: start,
  219. ChunkSize: chunk.Size,
  220. CipherKey: chunk.CipherKey,
  221. IsGzipped: chunk.IsCompressed,
  222. ModifiedTsNs: chunk.ModifiedTsNs,
  223. }
  224. chunkViews.InsertInterval(start, stop, chunk.ModifiedTsNs, chunkView)
  225. }
  226. // NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
  227. // If the file chunk content is a chunk manifest
  228. func NonOverlappingVisibleIntervals(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) {
  229. chunks, _, err = ResolveChunkManifest(ctx, lookupFileIdFn, chunks, startOffset, stopOffset)
  230. if err != nil {
  231. return
  232. }
  233. visibles2 := readResolvedChunks(chunks, 0, math.MaxInt64)
  234. return visibles2, err
  235. }
  236. // find non-overlapping visible intervals
  237. // visible interval map to one file chunk
  238. type VisibleInterval struct {
  239. start int64
  240. stop int64
  241. modifiedTsNs int64
  242. fileId string
  243. offsetInChunk int64
  244. chunkSize uint64
  245. cipherKey []byte
  246. isGzipped bool
  247. }
  248. func (v *VisibleInterval) SetStartStop(start, stop int64) {
  249. v.offsetInChunk += start - v.start
  250. v.start, v.stop = start, stop
  251. }
  252. func (v *VisibleInterval) Clone() IntervalValue {
  253. return &VisibleInterval{
  254. start: v.start,
  255. stop: v.stop,
  256. modifiedTsNs: v.modifiedTsNs,
  257. fileId: v.fileId,
  258. offsetInChunk: v.offsetInChunk,
  259. chunkSize: v.chunkSize,
  260. cipherKey: v.cipherKey,
  261. isGzipped: v.isGzipped,
  262. }
  263. }
  264. func min(x, y int64) int64 {
  265. if x <= y {
  266. return x
  267. }
  268. return y
  269. }
  270. func max(x, y int64) int64 {
  271. if x <= y {
  272. return y
  273. }
  274. return x
  275. }