merged_read.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. package logstore
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  5. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  6. )
  7. func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
  8. fromParquetFn := GenParquetReadFunc(filerClient, t, p)
  9. readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p)
  10. // Reversed order: live logs first (recent), then Parquet files (historical)
  11. // This provides better performance for real-time analytics queries
  12. return mergeReadFuncs(readLogDirectFn, fromParquetFn)
  13. }
  14. func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
  15. var exhaustedLiveLogs bool
  16. var lastProcessedPosition log_buffer.MessagePosition
  17. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
  18. if !exhaustedLiveLogs {
  19. // glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC())
  20. lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
  21. // glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
  22. if isDone {
  23. isDone = false
  24. }
  25. if err != nil {
  26. return
  27. }
  28. lastProcessedPosition = lastReadPosition
  29. }
  30. exhaustedLiveLogs = true
  31. if startPosition.Before(lastProcessedPosition.Time) {
  32. startPosition = lastProcessedPosition
  33. }
  34. // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
  35. lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
  36. return
  37. }
  38. }