| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718 |
- package engine
- import (
- "container/heap"
- "context"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "io"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/parquet-go/parquet-go"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/mq/logstore"
- "github.com/seaweedfs/seaweedfs/weed/mq/schema"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
- "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
- "google.golang.org/protobuf/proto"
- )
- // HybridMessageScanner scans from ALL data sources:
- // Architecture:
- // 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME
- // 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED
- // 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED
- // 4. Seamlessly merges data from all sources chronologically
- // 5. Provides complete real-time view of all messages in a topic
- type HybridMessageScanner struct {
- filerClient filer_pb.FilerClient
- brokerClient BrokerClientInterface // For querying unflushed data
- topic topic.Topic
- recordSchema *schema_pb.RecordType
- parquetLevels *schema.ParquetLevels
- engine *SQLEngine // Reference for system column formatting
- }
- // NewHybridMessageScanner creates a scanner that reads from all data sources
- // This provides complete real-time message coverage including unflushed data
- func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) {
- // Check if filerClient is available
- if filerClient == nil {
- return nil, fmt.Errorf("filerClient is required but not available")
- }
- // Create topic reference
- t := topic.Topic{
- Namespace: namespace,
- Name: topicName,
- }
- // Get topic schema from broker client (works with both real and mock clients)
- recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
- if err != nil {
- return nil, fmt.Errorf("failed to get topic schema: %v", err)
- }
- if recordType == nil {
- return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
- }
- // Create a copy of the recordType to avoid modifying the original
- recordTypeCopy := &schema_pb.RecordType{
- Fields: make([]*schema_pb.Field, len(recordType.Fields)),
- }
- copy(recordTypeCopy.Fields, recordType.Fields)
- // Add system columns that MQ adds to all records
- recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
- WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
- WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
- RecordTypeEnd()
- // Convert to Parquet levels for efficient reading
- parquetLevels, err := schema.ToParquetLevels(recordType)
- if err != nil {
- return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
- }
- return &HybridMessageScanner{
- filerClient: filerClient,
- brokerClient: brokerClient,
- topic: t,
- recordSchema: recordType,
- parquetLevels: parquetLevels,
- engine: engine,
- }, nil
- }
- // HybridScanOptions configure how the scanner reads from both live and archived data
- type HybridScanOptions struct {
- // Time range filtering (Unix nanoseconds)
- StartTimeNs int64
- StopTimeNs int64
- // Column projection - if empty, select all columns
- Columns []string
- // Row limit - 0 means no limit
- Limit int
- // Row offset - 0 means no offset
- Offset int
- // Predicate for WHERE clause filtering
- Predicate func(*schema_pb.RecordValue) bool
- }
- // HybridScanResult represents a message from either live logs or Parquet files
- type HybridScanResult struct {
- Values map[string]*schema_pb.Value // Column name -> value
- Timestamp int64 // Message timestamp (_ts_ns)
- Key []byte // Message key (_key)
- Source string // "live_log" or "parquet_archive" or "in_memory_broker"
- }
- // HybridScanStats contains statistics about data sources scanned
- type HybridScanStats struct {
- BrokerBufferQueried bool
- BrokerBufferMessages int
- BufferStartIndex int64
- PartitionsScanned int
- LiveLogFilesScanned int // Number of live log files processed
- }
- // ParquetColumnStats holds statistics for a single column from parquet metadata
- type ParquetColumnStats struct {
- ColumnName string
- MinValue *schema_pb.Value
- MaxValue *schema_pb.Value
- NullCount int64
- RowCount int64
- }
- // ParquetFileStats holds aggregated statistics for a parquet file
- type ParquetFileStats struct {
- FileName string
- RowCount int64
- ColumnStats map[string]*ParquetColumnStats
- // Optional file-level timestamp range from filer extended attributes
- MinTimestampNs int64
- MaxTimestampNs int64
- }
- // getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns
- func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) {
- if fileStats == nil {
- return 0, 0, false
- }
- // Prefer column stats for _ts_ns if present
- if len(fileStats.ColumnStats) > 0 {
- if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil {
- if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin {
- if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax {
- return minNs, maxNs, true
- }
- }
- }
- }
- // Fallback to file-level range if present in filer extended metadata
- if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 {
- return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true
- }
- return 0, 0, false
- }
- // schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns
- func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) {
- if v == nil {
- return 0, false
- }
- switch k := v.Kind.(type) {
- case *schema_pb.Value_Int64Value:
- return k.Int64Value, true
- case *schema_pb.Value_Int32Value:
- return int64(k.Int32Value), true
- default:
- return 0, false
- }
- }
- // StreamingDataSource provides a streaming interface for reading scan results
- type StreamingDataSource interface {
- Next() (*HybridScanResult, error) // Returns next result or nil when done
- HasMore() bool // Returns true if more data available
- Close() error // Clean up resources
- }
- // StreamingMergeItem represents an item in the priority queue for streaming merge
- type StreamingMergeItem struct {
- Result *HybridScanResult
- SourceID int
- DataSource StreamingDataSource
- }
- // StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp
- type StreamingMergeHeap []*StreamingMergeItem
- func (h StreamingMergeHeap) Len() int { return len(h) }
- func (h StreamingMergeHeap) Less(i, j int) bool {
- // Sort by timestamp (ascending order)
- return h[i].Result.Timestamp < h[j].Result.Timestamp
- }
- func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
- func (h *StreamingMergeHeap) Push(x interface{}) {
- *h = append(*h, x.(*StreamingMergeItem))
- }
- func (h *StreamingMergeHeap) Pop() interface{} {
- old := *h
- n := len(old)
- item := old[n-1]
- *h = old[0 : n-1]
- return item
- }
- // Scan reads messages from both live logs and archived Parquet files
- // Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration
- // Assumptions:
- // 1. Chronologically merges live and archived data
- // 2. Applies filtering at the lowest level for efficiency
- // 3. Handles schema evolution transparently
- func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) {
- results, _, err := hms.ScanWithStats(ctx, options)
- return results, err
- }
- // ScanWithStats reads messages and returns scan statistics for execution plans
- func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
- var results []HybridScanResult
- stats := &HybridScanStats{}
- // Get all partitions for this topic via MQ broker discovery
- partitions, err := hms.discoverTopicPartitions(ctx)
- if err != nil {
- return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err)
- }
- stats.PartitionsScanned = len(partitions)
- for _, partition := range partitions {
- partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
- if err != nil {
- return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err)
- }
- results = append(results, partitionResults...)
- // Aggregate broker buffer stats
- if partitionStats != nil {
- if partitionStats.BrokerBufferQueried {
- stats.BrokerBufferQueried = true
- }
- stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages
- if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) {
- stats.BufferStartIndex = partitionStats.BufferStartIndex
- }
- }
- // Apply global limit (without offset) across all partitions
- // When OFFSET is used, collect more data to ensure we have enough after skipping
- // Note: OFFSET will be applied at the end to avoid double-application
- if options.Limit > 0 {
- // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
- minRequired := options.Limit + options.Offset
- // Small buffer only when needed to handle edge cases in distributed scanning
- if options.Offset > 0 && minRequired < 10 {
- minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
- }
- if len(results) >= minRequired {
- break
- }
- }
- }
- // Apply final OFFSET and LIMIT processing (done once at the end)
- // Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
- if options.Offset > 0 || options.Limit >= 0 {
- // Handle LIMIT 0 special case first
- if options.Limit == 0 {
- return []HybridScanResult{}, stats, nil
- }
- // Apply OFFSET first
- if options.Offset > 0 {
- if options.Offset >= len(results) {
- results = []HybridScanResult{}
- } else {
- results = results[options.Offset:]
- }
- }
- // Apply LIMIT after OFFSET (only if limit > 0)
- if options.Limit > 0 && len(results) > options.Limit {
- results = results[:options.Limit]
- }
- }
- return results, stats, nil
- }
- // scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
- func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
- results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
- return results, err
- }
- // scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics
- func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
- var results []HybridScanResult
- stats := &HybridScanStats{}
- // Skip if no broker client available
- if hms.brokerClient == nil {
- return results, stats, nil
- }
- // Mark that we attempted to query broker buffer
- stats.BrokerBufferQueried = true
- // Step 1: Get unflushed data from broker using buffer_start-based method
- // This method uses buffer_start metadata to avoid double-counting with exact precision
- unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
- if err != nil {
- // Log error but don't fail the query - continue with disk data only
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
- }
- // Reset queried flag on error
- stats.BrokerBufferQueried = false
- return results, stats, nil
- }
- // Capture stats for EXPLAIN
- stats.BrokerBufferMessages = len(unflushedEntries)
- // Debug logging for EXPLAIN mode
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries))
- if len(unflushedEntries) > 0 {
- fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n")
- }
- }
- // Step 2: Process unflushed entries (already deduplicated by broker)
- for _, logEntry := range unflushedEntries {
- // Skip control entries without actual data
- if hms.isControlEntry(logEntry) {
- continue // Skip this entry
- }
- // Skip messages outside time range
- if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs {
- continue
- }
- if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs {
- continue
- }
- // Convert LogEntry to RecordValue format (same as disk data)
- recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry)
- if err != nil {
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err)
- }
- continue // Skip malformed messages
- }
- // Apply predicate filter if provided
- if options.Predicate != nil && !options.Predicate(recordValue) {
- continue
- }
- // Extract system columns for result
- timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
- key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
- // Apply column projection
- values := make(map[string]*schema_pb.Value)
- if len(options.Columns) == 0 {
- // Select all columns (excluding system columns from user view)
- for name, value := range recordValue.Fields {
- if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
- values[name] = value
- }
- }
- } else {
- // Select specified columns only
- for _, columnName := range options.Columns {
- if value, exists := recordValue.Fields[columnName]; exists {
- values[columnName] = value
- }
- }
- }
- // Create result with proper source tagging
- result := HybridScanResult{
- Values: values,
- Timestamp: timestamp,
- Key: key,
- Source: "live_log", // Data from broker's unflushed messages
- }
- results = append(results, result)
- // Apply limit (accounting for offset) - collect exact amount needed
- if options.Limit > 0 {
- // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
- minRequired := options.Limit + options.Offset
- // Small buffer only when needed to handle edge cases in message streaming
- if options.Offset > 0 && minRequired < 10 {
- minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
- }
- if len(results) >= minRequired {
- break
- }
- }
- }
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
- }
- return results, stats, nil
- }
- // convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue
- func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
- // Parse the message data as RecordValue
- recordValue := &schema_pb.RecordValue{}
- if err := proto.Unmarshal(msg.Value, recordValue); err != nil {
- return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err)
- }
- // Add system columns
- if recordValue.Fields == nil {
- recordValue.Fields = make(map[string]*schema_pb.Value)
- }
- // Add timestamp
- recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs},
- }
- return recordValue, string(msg.Key), nil
- }
- // discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem
- // This finds real partition directories like v2025-09-01-07-16-34/0000-0630/
- func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) {
- if hms.filerClient == nil {
- return nil, fmt.Errorf("filerClient not available for partition discovery")
- }
- var allPartitions []topic.Partition
- var err error
- // Scan the topic directory for actual partition versions (timestamped directories)
- // List all version directories in the topic directory
- err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
- if !versionEntry.IsDirectory {
- return nil // Skip non-directories
- }
- // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
- versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name)
- if parseErr != nil {
- // Skip directories that don't match the version format
- return nil
- }
- // Scan partition directories within this version
- versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name)
- return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
- if !partitionEntry.IsDirectory {
- return nil // Skip non-directories
- }
- // Parse partition boundary from directory name (e.g., "0000-0630")
- rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
- if rangeStart == rangeStop {
- return nil // Skip invalid partition names
- }
- // Create partition object
- partition := topic.Partition{
- RangeStart: rangeStart,
- RangeStop: rangeStop,
- RingSize: topic.PartitionCount,
- UnixTimeNs: versionTime.UnixNano(),
- }
- allPartitions = append(allPartitions, partition)
- return nil
- })
- })
- if err != nil {
- return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err)
- }
- // If no partitions found, return empty slice (valid for newly created or empty topics)
- if len(allPartitions) == 0 {
- fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String())
- return []topic.Partition{}, nil
- }
- fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String())
- return allPartitions, nil
- }
- // scanPartitionHybrid scans a specific partition using the hybrid approach
- // This is where the magic happens - seamlessly reading ALL data sources:
- // 1. Unflushed in-memory data from brokers (REAL-TIME)
- // 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED)
- func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
- results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
- return results, err
- }
- // scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency
- // PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting
- // - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records
- // - Scalable: Can handle large topics without LIMIT clauses efficiently
- // - Streaming: Processes data as it arrives rather than buffering everything
- func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
- stats := &HybridScanStats{}
- // STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME)
- unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
- if err != nil {
- // Don't fail the query if broker scanning fails, but provide clear warning to user
- // This ensures users are aware that results may not include the most recent data
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err)
- } else {
- fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
- fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
- }
- } else if unflushedStats != nil {
- stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
- stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
- stats.BufferStartIndex = unflushedStats.BufferStartIndex
- }
- // Count live log files for statistics
- liveLogCount, err := hms.countLiveLogFiles(partition)
- if err != nil {
- // Don't fail the query, just log warning
- fmt.Printf("Warning: Failed to count live log files: %v\n", err)
- liveLogCount = 0
- }
- stats.LiveLogFilesScanned = liveLogCount
- // STEP 2: Create streaming data sources for memory-efficient merge
- var dataSources []StreamingDataSource
- // Add unflushed data source (if we have unflushed results)
- if len(unflushedResults) > 0 {
- // Sort unflushed results by timestamp before creating stream
- if len(unflushedResults) > 1 {
- hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1)
- }
- dataSources = append(dataSources, NewSliceDataSource(unflushedResults))
- }
- // Add streaming flushed data source (live logs + Parquet files)
- flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options)
- dataSources = append(dataSources, flushedDataSource)
- // STEP 3: Use streaming merge for memory-efficient chronological ordering
- var results []HybridScanResult
- if len(dataSources) > 0 {
- // Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT)
- // For LIMIT N OFFSET M, we need to collect at least N+M rows
- scanLimit := options.Limit
- if options.Limit > 0 && options.Offset > 0 {
- scanLimit = options.Limit + options.Offset
- }
- mergedResults, err := hms.streamingMerge(dataSources, scanLimit)
- if err != nil {
- return nil, stats, fmt.Errorf("streaming merge failed: %v", err)
- }
- results = mergedResults
- }
- return results, stats, nil
- }
- // countLiveLogFiles counts the number of live log files in a partition for statistics
- func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) {
- partitionDir := topic.PartitionDir(hms.topic, partition)
- var fileCount int
- err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- // List all files in partition directory
- request := &filer_pb.ListEntriesRequest{
- Directory: partitionDir,
- Prefix: "",
- StartFromFileName: "",
- InclusiveStartFrom: true,
- Limit: 10000, // reasonable limit for counting
- }
- stream, err := client.ListEntries(context.Background(), request)
- if err != nil {
- return err
- }
- for {
- resp, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
- // Count files that are not .parquet files (live log files)
- // Live log files typically have timestamps or are named like log files
- fileName := resp.Entry.Name
- if !strings.HasSuffix(fileName, ".parquet") &&
- !strings.HasSuffix(fileName, ".offset") &&
- len(resp.Entry.Chunks) > 0 { // Has actual content
- fileCount++
- }
- }
- return nil
- })
- if err != nil {
- return 0, err
- }
- return fileCount, nil
- }
- // isControlEntry checks if a log entry is a control entry without actual data
- // Based on MQ system analysis, control entries are:
- // 1. DataMessages with populated Ctrl field (publisher close signals)
- // 2. Entries with empty keys (as filtered by subscriber)
- // 3. Entries with no data
- func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
- // Skip entries with no data
- if len(logEntry.Data) == 0 {
- return true
- }
- // Skip entries with empty keys (same logic as subscriber)
- if len(logEntry.Key) == 0 {
- return true
- }
- // Check if this is a DataMessage with control field populated
- dataMessage := &mq_pb.DataMessage{}
- if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
- // If it has a control field, it's a control message
- if dataMessage.Ctrl != nil {
- return true
- }
- }
- return false
- }
- // convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
- // This handles both:
- // 1. Live log entries (raw message format)
- // 2. Parquet entries (already in schema_pb.RecordValue format)
- func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
- // Try to unmarshal as RecordValue first (Parquet format)
- recordValue := &schema_pb.RecordValue{}
- if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
- // This is an archived message from Parquet files
- // FIX: Add system columns from LogEntry to RecordValue
- if recordValue.Fields == nil {
- recordValue.Fields = make(map[string]*schema_pb.Value)
- }
- // Add system columns from LogEntry
- recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
- }
- recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
- Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
- }
- return recordValue, "parquet_archive", nil
- }
- // If not a RecordValue, this is raw live message data - parse with schema
- return hms.parseRawMessageWithSchema(logEntry)
- }
- // parseRawMessageWithSchema parses raw live message data using the topic's schema
- // This provides proper type conversion and field mapping instead of treating everything as strings
- func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
- recordValue := &schema_pb.RecordValue{
- Fields: make(map[string]*schema_pb.Value),
- }
- // Add system columns (always present)
- recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
- }
- recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
- Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
- }
- // Parse message data based on schema
- if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
- // Fallback: No schema available, treat as single "data" field
- recordValue.Fields["data"] = &schema_pb.Value{
- Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
- }
- return recordValue, "live_log", nil
- }
- // Attempt schema-aware parsing
- // Strategy 1: Try JSON parsing first (most common for live messages)
- if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil {
- // Successfully parsed as JSON, merge with system columns
- for fieldName, fieldValue := range parsedRecord.Fields {
- recordValue.Fields[fieldName] = fieldValue
- }
- return recordValue, "live_log", nil
- }
- // Strategy 2: Try protobuf parsing (binary messages)
- if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil {
- // Successfully parsed as protobuf, merge with system columns
- for fieldName, fieldValue := range parsedRecord.Fields {
- recordValue.Fields[fieldName] = fieldValue
- }
- return recordValue, "live_log", nil
- }
- // Strategy 3: Fallback to single field with raw data
- // If schema has a single field, map the raw data to it with type conversion
- if len(hms.recordSchema.Fields) == 1 {
- field := hms.recordSchema.Fields[0]
- convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
- if err == nil {
- recordValue.Fields[field.Name] = convertedValue
- return recordValue, "live_log", nil
- }
- }
- // Final fallback: treat as string data field
- recordValue.Fields["data"] = &schema_pb.Value{
- Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
- }
- return recordValue, "live_log", nil
- }
- // parseJSONMessage attempts to parse raw data as JSON and map to schema fields
- func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
- // Try to parse as JSON
- var jsonData map[string]interface{}
- if err := json.Unmarshal(data, &jsonData); err != nil {
- return nil, fmt.Errorf("not valid JSON: %v", err)
- }
- recordValue := &schema_pb.RecordValue{
- Fields: make(map[string]*schema_pb.Value),
- }
- // Map JSON fields to schema fields
- for _, schemaField := range hms.recordSchema.Fields {
- fieldName := schemaField.Name
- if jsonValue, exists := jsonData[fieldName]; exists {
- schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type)
- if err != nil {
- // Log conversion error but continue with other fields
- continue
- }
- recordValue.Fields[fieldName] = schemaValue
- }
- }
- return recordValue, nil
- }
- // parseProtobufMessage attempts to parse raw data as protobuf RecordValue
- func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) {
- // This might be a raw protobuf message that didn't parse correctly the first time
- // Try alternative protobuf unmarshaling approaches
- recordValue := &schema_pb.RecordValue{}
- // Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue)
- if err := proto.Unmarshal(data, recordValue); err == nil {
- return recordValue, nil
- }
- // Strategy 2: Check if it's a different protobuf message type
- // For now, return error as we need more specific knowledge of MQ message formats
- return nil, fmt.Errorf("could not parse as protobuf RecordValue")
- }
- // convertRawDataToSchemaValue converts raw bytes to a specific schema type
- func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
- dataStr := string(data)
- switch fieldType.Kind.(type) {
- case *schema_pb.Type_ScalarType:
- scalarType := fieldType.GetScalarType()
- switch scalarType {
- case schema_pb.ScalarType_STRING:
- return &schema_pb.Value{
- Kind: &schema_pb.Value_StringValue{StringValue: dataStr},
- }, nil
- case schema_pb.ScalarType_INT32:
- if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)},
- }, nil
- }
- case schema_pb.ScalarType_INT64:
- if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: val},
- }, nil
- }
- case schema_pb.ScalarType_FLOAT:
- if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)},
- }, nil
- }
- case schema_pb.ScalarType_DOUBLE:
- if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_DoubleValue{DoubleValue: val},
- }, nil
- }
- case schema_pb.ScalarType_BOOL:
- lowerStr := strings.ToLower(strings.TrimSpace(dataStr))
- if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_BoolValue{BoolValue: true},
- }, nil
- } else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_BoolValue{BoolValue: false},
- }, nil
- }
- case schema_pb.ScalarType_BYTES:
- return &schema_pb.Value{
- Kind: &schema_pb.Value_BytesValue{BytesValue: data},
- }, nil
- }
- }
- return nil, fmt.Errorf("unsupported type conversion for %v", fieldType)
- }
- // convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type
- func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
- switch fieldType.Kind.(type) {
- case *schema_pb.Type_ScalarType:
- scalarType := fieldType.GetScalarType()
- switch scalarType {
- case schema_pb.ScalarType_STRING:
- if str, ok := jsonValue.(string); ok {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_StringValue{StringValue: str},
- }, nil
- }
- // Convert other types to string
- return &schema_pb.Value{
- Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)},
- }, nil
- case schema_pb.ScalarType_INT32:
- if num, ok := jsonValue.(float64); ok { // JSON numbers are float64
- return &schema_pb.Value{
- Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)},
- }, nil
- }
- case schema_pb.ScalarType_INT64:
- if num, ok := jsonValue.(float64); ok {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)},
- }, nil
- }
- case schema_pb.ScalarType_FLOAT:
- if num, ok := jsonValue.(float64); ok {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)},
- }, nil
- }
- case schema_pb.ScalarType_DOUBLE:
- if num, ok := jsonValue.(float64); ok {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_DoubleValue{DoubleValue: num},
- }, nil
- }
- case schema_pb.ScalarType_BOOL:
- if boolVal, ok := jsonValue.(bool); ok {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
- }, nil
- }
- case schema_pb.ScalarType_BYTES:
- if str, ok := jsonValue.(string); ok {
- return &schema_pb.Value{
- Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)},
- }, nil
- }
- }
- }
- return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType)
- }
- // ConvertToSQLResult converts HybridScanResults to SQL query results
- func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult {
- if len(results) == 0 {
- return &QueryResult{
- Columns: columns,
- Rows: [][]sqltypes.Value{},
- Database: hms.topic.Namespace,
- Table: hms.topic.Name,
- }
- }
- // Determine columns if not specified
- if len(columns) == 0 {
- columnSet := make(map[string]bool)
- for _, result := range results {
- for columnName := range result.Values {
- columnSet[columnName] = true
- }
- }
- columns = make([]string, 0, len(columnSet))
- for columnName := range columnSet {
- columns = append(columns, columnName)
- }
- }
- // Convert to SQL rows
- rows := make([][]sqltypes.Value, len(results))
- for i, result := range results {
- row := make([]sqltypes.Value, len(columns))
- for j, columnName := range columns {
- switch columnName {
- case SW_COLUMN_NAME_SOURCE:
- row[j] = sqltypes.NewVarChar(result.Source)
- case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP:
- // Format timestamp as proper timestamp type instead of raw nanoseconds
- row[j] = hms.engine.formatTimestampColumn(result.Timestamp)
- case SW_COLUMN_NAME_KEY:
- row[j] = sqltypes.NewVarBinary(string(result.Key))
- default:
- if value, exists := result.Values[columnName]; exists {
- row[j] = convertSchemaValueToSQL(value)
- } else {
- row[j] = sqltypes.NULL
- }
- }
- }
- rows[i] = row
- }
- return &QueryResult{
- Columns: columns,
- Rows: rows,
- Database: hms.topic.Namespace,
- Table: hms.topic.Name,
- }
- }
- // ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries
- // Combines auto-discovered columns (from *) with explicitly requested columns
- func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult {
- if len(results) == 0 {
- // For empty results, combine auto-discovered columns with explicit ones
- columnSet := make(map[string]bool)
- // Add explicit columns first
- for _, col := range explicitColumns {
- columnSet[col] = true
- }
- // Build final column list
- columns := make([]string, 0, len(columnSet))
- for col := range columnSet {
- columns = append(columns, col)
- }
- return &QueryResult{
- Columns: columns,
- Rows: [][]sqltypes.Value{},
- Database: hms.topic.Namespace,
- Table: hms.topic.Name,
- }
- }
- // Auto-discover columns from data (like SELECT *)
- autoColumns := make(map[string]bool)
- for _, result := range results {
- for columnName := range result.Values {
- autoColumns[columnName] = true
- }
- }
- // Combine auto-discovered and explicit columns
- columnSet := make(map[string]bool)
- // Add auto-discovered columns first (regular data columns)
- for col := range autoColumns {
- columnSet[col] = true
- }
- // Add explicit columns (may include system columns like _source)
- for _, col := range explicitColumns {
- columnSet[col] = true
- }
- // Build final column list
- columns := make([]string, 0, len(columnSet))
- for col := range columnSet {
- columns = append(columns, col)
- }
- // Convert to SQL rows
- rows := make([][]sqltypes.Value, len(results))
- for i, result := range results {
- row := make([]sqltypes.Value, len(columns))
- for j, columnName := range columns {
- switch columnName {
- case SW_COLUMN_NAME_TIMESTAMP:
- row[j] = sqltypes.NewInt64(result.Timestamp)
- case SW_COLUMN_NAME_KEY:
- row[j] = sqltypes.NewVarBinary(string(result.Key))
- case SW_COLUMN_NAME_SOURCE:
- row[j] = sqltypes.NewVarChar(result.Source)
- default:
- // Regular data column
- if value, exists := result.Values[columnName]; exists {
- row[j] = convertSchemaValueToSQL(value)
- } else {
- row[j] = sqltypes.NULL
- }
- }
- }
- rows[i] = row
- }
- return &QueryResult{
- Columns: columns,
- Rows: rows,
- Database: hms.topic.Namespace,
- Table: hms.topic.Name,
- }
- }
- // ReadParquetStatistics efficiently reads column statistics from parquet files
- // without scanning the full file content - uses parquet's built-in metadata
- func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
- var fileStats []*ParquetFileStats
- // Use the same chunk cache as the logstore package
- chunkCache := chunk_cache.NewChunkCacheInMemory(256)
- lookupFileIdFn := filer.LookupFn(h.filerClient)
- err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- // Only process parquet files
- if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
- return nil
- }
- // Extract statistics from this parquet file
- stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache)
- if err != nil {
- // Log error but continue processing other files
- fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err)
- return nil
- }
- if stats != nil {
- fileStats = append(fileStats, stats)
- }
- return nil
- })
- return fileStats, err
- }
- // extractParquetFileStats extracts column statistics from a single parquet file
- func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) {
- // Create reader for the parquet file
- fileSize := filer.FileSize(entry)
- visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
- chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
- readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
- readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
- // Create parquet reader - this only reads metadata, not data
- parquetReader := parquet.NewReader(readerAt)
- defer parquetReader.Close()
- fileView := parquetReader.File()
- fileStats := &ParquetFileStats{
- FileName: entry.Name,
- RowCount: fileView.NumRows(),
- ColumnStats: make(map[string]*ParquetColumnStats),
- }
- // Populate optional min/max from filer extended attributes (writer stores ns timestamps)
- if entry != nil && entry.Extended != nil {
- if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 {
- fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
- }
- if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 {
- fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
- }
- }
- // Get schema information
- schema := fileView.Schema()
- // Process each row group
- rowGroups := fileView.RowGroups()
- for _, rowGroup := range rowGroups {
- columnChunks := rowGroup.ColumnChunks()
- // Process each column chunk
- for i, chunk := range columnChunks {
- // Get column name from schema
- columnName := h.getColumnNameFromSchema(schema, i)
- if columnName == "" {
- continue
- }
- // Try to get column statistics
- columnIndex, err := chunk.ColumnIndex()
- if err != nil {
- // No column index available - skip this column
- continue
- }
- // Extract min/max values from the first page (for simplicity)
- // In a more sophisticated implementation, we could aggregate across all pages
- numPages := columnIndex.NumPages()
- if numPages == 0 {
- continue
- }
- minParquetValue := columnIndex.MinValue(0)
- maxParquetValue := columnIndex.MaxValue(numPages - 1)
- nullCount := int64(0)
- // Aggregate null counts across all pages
- for pageIdx := 0; pageIdx < numPages; pageIdx++ {
- nullCount += columnIndex.NullCount(pageIdx)
- }
- // Convert parquet values to schema_pb.Value
- minValue, err := h.convertParquetValueToSchemaValue(minParquetValue)
- if err != nil {
- continue
- }
- maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue)
- if err != nil {
- continue
- }
- // Store column statistics (aggregate across row groups if column already exists)
- if existingStats, exists := fileStats.ColumnStats[columnName]; exists {
- // Update existing statistics
- if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 {
- existingStats.MinValue = minValue
- }
- if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 {
- existingStats.MaxValue = maxValue
- }
- existingStats.NullCount += nullCount
- } else {
- // Create new column statistics
- fileStats.ColumnStats[columnName] = &ParquetColumnStats{
- ColumnName: columnName,
- MinValue: minValue,
- MaxValue: maxValue,
- NullCount: nullCount,
- RowCount: rowGroup.NumRows(),
- }
- }
- }
- }
- return fileStats, nil
- }
- // getColumnNameFromSchema extracts column name from parquet schema by index
- func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string {
- // Get the leaf columns in order
- var columnNames []string
- h.collectColumnNames(schema.Fields(), &columnNames)
- if columnIndex >= 0 && columnIndex < len(columnNames) {
- return columnNames[columnIndex]
- }
- return ""
- }
- // collectColumnNames recursively collects leaf column names from schema
- func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) {
- for _, field := range fields {
- if len(field.Fields()) == 0 {
- // This is a leaf field (no sub-fields)
- *names = append(*names, field.Name())
- } else {
- // This is a group - recurse
- h.collectColumnNames(field.Fields(), names)
- }
- }
- }
- // convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value
- func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) {
- switch pv.Kind() {
- case parquet.Boolean:
- return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil
- case parquet.Int32:
- return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil
- case parquet.Int64:
- return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil
- case parquet.Float:
- return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil
- case parquet.Double:
- return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil
- case parquet.ByteArray:
- return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil
- default:
- return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind())
- }
- }
- // compareSchemaValues compares two schema_pb.Value objects
- func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int {
- if v1 == nil && v2 == nil {
- return 0
- }
- if v1 == nil {
- return -1
- }
- if v2 == nil {
- return 1
- }
- // Extract raw values and compare
- raw1 := h.extractRawValueFromSchema(v1)
- raw2 := h.extractRawValueFromSchema(v2)
- return h.compareRawValues(raw1, raw2)
- }
- // extractRawValueFromSchema extracts the raw value from schema_pb.Value
- func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} {
- switch v := value.Kind.(type) {
- case *schema_pb.Value_BoolValue:
- return v.BoolValue
- case *schema_pb.Value_Int32Value:
- return v.Int32Value
- case *schema_pb.Value_Int64Value:
- return v.Int64Value
- case *schema_pb.Value_FloatValue:
- return v.FloatValue
- case *schema_pb.Value_DoubleValue:
- return v.DoubleValue
- case *schema_pb.Value_BytesValue:
- return string(v.BytesValue) // Convert to string for comparison
- case *schema_pb.Value_StringValue:
- return v.StringValue
- }
- return nil
- }
- // compareRawValues compares two raw values
- func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int {
- // Handle nil cases
- if v1 == nil && v2 == nil {
- return 0
- }
- if v1 == nil {
- return -1
- }
- if v2 == nil {
- return 1
- }
- // Compare based on type
- switch val1 := v1.(type) {
- case bool:
- if val2, ok := v2.(bool); ok {
- if val1 == val2 {
- return 0
- }
- if val1 {
- return 1
- }
- return -1
- }
- case int32:
- if val2, ok := v2.(int32); ok {
- if val1 < val2 {
- return -1
- } else if val1 > val2 {
- return 1
- }
- return 0
- }
- case int64:
- if val2, ok := v2.(int64); ok {
- if val1 < val2 {
- return -1
- } else if val1 > val2 {
- return 1
- }
- return 0
- }
- case float32:
- if val2, ok := v2.(float32); ok {
- if val1 < val2 {
- return -1
- } else if val1 > val2 {
- return 1
- }
- return 0
- }
- case float64:
- if val2, ok := v2.(float64); ok {
- if val1 < val2 {
- return -1
- } else if val1 > val2 {
- return 1
- }
- return 0
- }
- case string:
- if val2, ok := v2.(string); ok {
- if val1 < val2 {
- return -1
- } else if val1 > val2 {
- return 1
- }
- return 0
- }
- }
- // Default: try string comparison
- str1 := fmt.Sprintf("%v", v1)
- str2 := fmt.Sprintf("%v", v2)
- if str1 < str2 {
- return -1
- } else if str1 > str2 {
- return 1
- }
- return 0
- }
- // streamingMerge merges multiple sorted data sources using a heap-based approach
- // This provides memory-efficient merging without loading all data into memory
- func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) {
- if len(dataSources) == 0 {
- return nil, nil
- }
- var results []HybridScanResult
- mergeHeap := &StreamingMergeHeap{}
- heap.Init(mergeHeap)
- // Initialize heap with first item from each data source
- for i, source := range dataSources {
- if source.HasMore() {
- result, err := source.Next()
- if err != nil {
- // Close all sources and return error
- for _, s := range dataSources {
- s.Close()
- }
- return nil, fmt.Errorf("failed to read from data source %d: %v", i, err)
- }
- if result != nil {
- heap.Push(mergeHeap, &StreamingMergeItem{
- Result: result,
- SourceID: i,
- DataSource: source,
- })
- }
- }
- }
- // Process results in chronological order
- for mergeHeap.Len() > 0 {
- // Get next chronologically ordered result
- item := heap.Pop(mergeHeap).(*StreamingMergeItem)
- results = append(results, *item.Result)
- // Check limit
- if limit > 0 && len(results) >= limit {
- break
- }
- // Try to get next item from the same data source
- if item.DataSource.HasMore() {
- nextResult, err := item.DataSource.Next()
- if err != nil {
- // Log error but continue with other sources
- fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err)
- } else if nextResult != nil {
- heap.Push(mergeHeap, &StreamingMergeItem{
- Result: nextResult,
- SourceID: item.SourceID,
- DataSource: item.DataSource,
- })
- }
- }
- }
- // Close all data sources
- for _, source := range dataSources {
- source.Close()
- }
- return results, nil
- }
- // SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource
- // This is used for unflushed data that is already loaded into memory
- type SliceDataSource struct {
- results []HybridScanResult
- index int
- }
- func NewSliceDataSource(results []HybridScanResult) *SliceDataSource {
- return &SliceDataSource{
- results: results,
- index: 0,
- }
- }
- func (s *SliceDataSource) Next() (*HybridScanResult, error) {
- if s.index >= len(s.results) {
- return nil, nil
- }
- result := &s.results[s.index]
- s.index++
- return result, nil
- }
- func (s *SliceDataSource) HasMore() bool {
- return s.index < len(s.results)
- }
- func (s *SliceDataSource) Close() error {
- return nil // Nothing to clean up for slice-based source
- }
- // StreamingFlushedDataSource provides streaming access to flushed data
- type StreamingFlushedDataSource struct {
- hms *HybridMessageScanner
- partition topic.Partition
- options HybridScanOptions
- mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error)
- resultChan chan *HybridScanResult
- errorChan chan error
- doneChan chan struct{}
- started bool
- finished bool
- closed int32 // atomic flag to prevent double close
- mu sync.RWMutex
- }
- func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource {
- mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition)
- return &StreamingFlushedDataSource{
- hms: hms,
- partition: partition,
- options: options,
- mergedReadFn: mergedReadFn,
- resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance
- errorChan: make(chan error, 1),
- doneChan: make(chan struct{}),
- started: false,
- finished: false,
- }
- }
- func (s *StreamingFlushedDataSource) startStreaming() {
- if s.started {
- return
- }
- s.started = true
- go func() {
- defer func() {
- // Use atomic flag to ensure channels are only closed once
- if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
- close(s.resultChan)
- close(s.errorChan)
- close(s.doneChan)
- }
- }()
- // Set up time range for scanning
- startTime := time.Unix(0, s.options.StartTimeNs)
- if s.options.StartTimeNs == 0 {
- startTime = time.Unix(0, 0)
- }
- stopTsNs := s.options.StopTimeNs
- // For SQL queries, stopTsNs = 0 means "no stop time restriction"
- // This is different from message queue consumers which want to stop at "now"
- // We detect SQL context by checking if we have a predicate function
- if stopTsNs == 0 && s.options.Predicate == nil {
- // Only set to current time for non-SQL queries (message queue consumers)
- stopTsNs = time.Now().UnixNano()
- }
- // If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning
- // Message processing function
- eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
- // Skip control entries without actual data
- if s.hms.isControlEntry(logEntry) {
- return false, nil // Skip this entry
- }
- // Convert log entry to schema_pb.RecordValue for consistent processing
- recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry)
- if convertErr != nil {
- return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
- }
- // Apply predicate filtering (WHERE clause)
- if s.options.Predicate != nil && !s.options.Predicate(recordValue) {
- return false, nil // Skip this message
- }
- // Extract system columns
- timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
- key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
- // Apply column projection
- values := make(map[string]*schema_pb.Value)
- if len(s.options.Columns) == 0 {
- // Select all columns (excluding system columns from user view)
- for name, value := range recordValue.Fields {
- if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
- values[name] = value
- }
- }
- } else {
- // Select specified columns only
- for _, columnName := range s.options.Columns {
- if value, exists := recordValue.Fields[columnName]; exists {
- values[columnName] = value
- }
- }
- }
- result := &HybridScanResult{
- Values: values,
- Timestamp: timestamp,
- Key: key,
- Source: source,
- }
- // Check if already closed before trying to send
- if atomic.LoadInt32(&s.closed) != 0 {
- return true, nil // Stop processing if closed
- }
- // Send result to channel with proper handling of closed channels
- select {
- case s.resultChan <- result:
- return false, nil
- case <-s.doneChan:
- return true, nil // Stop processing if closed
- default:
- // Check again if closed (in case it was closed between the atomic check and select)
- if atomic.LoadInt32(&s.closed) != 0 {
- return true, nil
- }
- // If not closed, try sending again with blocking select
- select {
- case s.resultChan <- result:
- return false, nil
- case <-s.doneChan:
- return true, nil
- }
- }
- }
- // Start scanning from the specified position
- startPosition := log_buffer.MessagePosition{Time: startTime}
- _, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn)
- if err != nil {
- // Only try to send error if not already closed
- if atomic.LoadInt32(&s.closed) == 0 {
- select {
- case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err):
- case <-s.doneChan:
- default:
- // Channel might be full or closed, ignore
- }
- }
- }
- s.finished = true
- }()
- }
- func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) {
- if !s.started {
- s.startStreaming()
- }
- select {
- case result, ok := <-s.resultChan:
- if !ok {
- return nil, nil // No more results
- }
- return result, nil
- case err := <-s.errorChan:
- return nil, err
- case <-s.doneChan:
- return nil, nil
- }
- }
- func (s *StreamingFlushedDataSource) HasMore() bool {
- if !s.started {
- return true // Haven't started yet, so potentially has data
- }
- return !s.finished || len(s.resultChan) > 0
- }
- func (s *StreamingFlushedDataSource) Close() error {
- // Use atomic flag to ensure channels are only closed once
- if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
- close(s.doneChan)
- close(s.resultChan)
- close(s.errorChan)
- }
- return nil
- }
- // mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm
- func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) {
- if left < right {
- mid := left + (right-left)/2
- // Recursively sort both halves
- hms.mergeSort(results, left, mid)
- hms.mergeSort(results, mid+1, right)
- // Merge the sorted halves
- hms.merge(results, left, mid, right)
- }
- }
- // merge combines two sorted subarrays into a single sorted array
- func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) {
- // Create temporary arrays for the two subarrays
- leftArray := make([]HybridScanResult, mid-left+1)
- rightArray := make([]HybridScanResult, right-mid)
- // Copy data to temporary arrays
- copy(leftArray, results[left:mid+1])
- copy(rightArray, results[mid+1:right+1])
- // Merge the temporary arrays back into results[left..right]
- i, j, k := 0, 0, left
- for i < len(leftArray) && j < len(rightArray) {
- if leftArray[i].Timestamp <= rightArray[j].Timestamp {
- results[k] = leftArray[i]
- i++
- } else {
- results[k] = rightArray[j]
- j++
- }
- k++
- }
- // Copy remaining elements of leftArray, if any
- for i < len(leftArray) {
- results[k] = leftArray[i]
- i++
- k++
- }
- // Copy remaining elements of rightArray, if any
- for j < len(rightArray) {
- results[k] = rightArray[j]
- j++
- k++
- }
- }
|