read_parquet_to_log.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package logstore
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "math"
  8. "strings"
  9. "github.com/parquet-go/parquet-go"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  12. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  16. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  17. "google.golang.org/protobuf/proto"
  18. )
  19. var (
  20. chunkCache = chunk_cache.NewChunkCacheInMemory(256) // 256 entries, 8MB max per entry
  21. )
  22. // isControlEntry checks if a log entry is a control entry without actual data
  23. // Based on MQ system analysis, control entries are:
  24. // 1. DataMessages with populated Ctrl field (publisher close signals)
  25. // 2. Entries with empty keys (as filtered by subscriber)
  26. // 3. Entries with no data
  27. func isControlEntry(logEntry *filer_pb.LogEntry) bool {
  28. // Skip entries with no data
  29. if len(logEntry.Data) == 0 {
  30. return true
  31. }
  32. // Skip entries with empty keys (same logic as subscriber)
  33. if len(logEntry.Key) == 0 {
  34. return true
  35. }
  36. // Check if this is a DataMessage with control field populated
  37. dataMessage := &mq_pb.DataMessage{}
  38. if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
  39. // If it has a control field, it's a control message
  40. if dataMessage.Ctrl != nil {
  41. return true
  42. }
  43. }
  44. return false
  45. }
  46. func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
  47. partitionDir := topic.PartitionDir(t, p)
  48. lookupFileIdFn := filer.LookupFn(filerClient)
  49. // read topic conf from filer
  50. var topicConf *mq_pb.ConfigureTopicResponse
  51. var err error
  52. if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  53. topicConf, err = t.ReadConfFile(client)
  54. return err
  55. }); err != nil {
  56. // Return a no-op function for test environments or when topic config can't be read
  57. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (log_buffer.MessagePosition, bool, error) {
  58. return startPosition, true, nil
  59. }
  60. }
  61. recordType := topicConf.GetRecordType()
  62. if recordType == nil {
  63. // Return a no-op function if no schema is available
  64. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (log_buffer.MessagePosition, bool, error) {
  65. return startPosition, true, nil
  66. }
  67. }
  68. recordType = schema.NewRecordTypeBuilder(recordType).
  69. WithField(SW_COLUMN_NAME_TS, schema.TypeInt64).
  70. WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
  71. RecordTypeEnd()
  72. parquetLevels, err := schema.ToParquetLevels(recordType)
  73. if err != nil {
  74. return nil
  75. }
  76. // eachFileFn reads a parquet file and calls eachLogEntryFn for each log entry
  77. eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
  78. // create readerAt for the parquet file
  79. fileSize := filer.FileSize(entry)
  80. visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
  81. chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
  82. readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
  83. readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
  84. // create parquet reader
  85. parquetReader := parquet.NewReader(readerAt)
  86. rows := make([]parquet.Row, 128)
  87. for {
  88. rowCount, readErr := parquetReader.ReadRows(rows)
  89. // Process the rows first, even if EOF is returned
  90. for i := 0; i < rowCount; i++ {
  91. row := rows[i]
  92. // convert parquet row to schema_pb.RecordValue
  93. recordValue, err := schema.ToRecordValue(recordType, parquetLevels, row)
  94. if err != nil {
  95. return processedTsNs, fmt.Errorf("ToRecordValue failed: %w", err)
  96. }
  97. processedTsNs = recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value()
  98. if processedTsNs <= starTsNs {
  99. continue
  100. }
  101. if stopTsNs != 0 && processedTsNs >= stopTsNs {
  102. return processedTsNs, nil
  103. }
  104. data, marshalErr := proto.Marshal(recordValue)
  105. if marshalErr != nil {
  106. return processedTsNs, fmt.Errorf("marshal record value: %w", marshalErr)
  107. }
  108. logEntry := &filer_pb.LogEntry{
  109. Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(),
  110. TsNs: processedTsNs,
  111. Data: data,
  112. }
  113. // Skip control entries without actual data
  114. if isControlEntry(logEntry) {
  115. continue
  116. }
  117. // fmt.Printf(" parquet entry %s ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
  118. if _, err = eachLogEntryFn(logEntry); err != nil {
  119. return processedTsNs, fmt.Errorf("process log entry %v: %w", logEntry, err)
  120. }
  121. }
  122. // Check for end conditions after processing rows
  123. if readErr != nil {
  124. if readErr == io.EOF {
  125. return processedTsNs, nil
  126. }
  127. return processedTsNs, readErr
  128. }
  129. if rowCount == 0 {
  130. return processedTsNs, nil
  131. }
  132. }
  133. }
  134. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
  135. startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
  136. startTsNs := startPosition.Time.UnixNano()
  137. var processedTsNs int64
  138. err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  139. return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  140. if entry.IsDirectory {
  141. return nil
  142. }
  143. if !strings.HasSuffix(entry.Name, ".parquet") {
  144. return nil
  145. }
  146. if len(entry.Extended) == 0 {
  147. return nil
  148. }
  149. // read minTs from the parquet file
  150. minTsBytes := entry.Extended["min"]
  151. if len(minTsBytes) != 8 {
  152. return nil
  153. }
  154. minTsNs := int64(binary.BigEndian.Uint64(minTsBytes))
  155. // read max ts
  156. maxTsBytes := entry.Extended["max"]
  157. if len(maxTsBytes) != 8 {
  158. return nil
  159. }
  160. maxTsNs := int64(binary.BigEndian.Uint64(maxTsBytes))
  161. if stopTsNs != 0 && stopTsNs <= minTsNs {
  162. isDone = true
  163. return nil
  164. }
  165. if maxTsNs < startTsNs {
  166. return nil
  167. }
  168. if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
  169. return err
  170. }
  171. return nil
  172. }, startFileName, true, math.MaxInt32)
  173. })
  174. lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
  175. return
  176. }
  177. }