| 12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- package logstore
- import (
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- )
- func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
- fromParquetFn := GenParquetReadFunc(filerClient, t, p)
- readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p)
- // Reversed order: live logs first (recent), then Parquet files (historical)
- // This provides better performance for real-time analytics queries
- return mergeReadFuncs(readLogDirectFn, fromParquetFn)
- }
- func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
- var exhaustedLiveLogs bool
- var lastProcessedPosition log_buffer.MessagePosition
- return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
- if !exhaustedLiveLogs {
- // glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC())
- lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
- // glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
- if isDone {
- isDone = false
- }
- if err != nil {
- return
- }
- lastProcessedPosition = lastReadPosition
- }
- exhaustedLiveLogs = true
- if startPosition.Before(lastProcessedPosition.Time) {
- startPosition = lastProcessedPosition
- }
- // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
- lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
- return
- }
- }
|