hybrid_message_scanner.go 54 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718
  1. package engine
  2. import (
  3. "container/heap"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/parquet-go/parquet-go"
  15. "github.com/seaweedfs/seaweedfs/weed/filer"
  16. "github.com/seaweedfs/seaweedfs/weed/mq/logstore"
  17. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  18. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  19. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  20. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  21. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  22. "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
  23. "github.com/seaweedfs/seaweedfs/weed/util"
  24. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  25. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  26. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  27. "google.golang.org/protobuf/proto"
  28. )
  29. // HybridMessageScanner scans from ALL data sources:
  30. // Architecture:
  31. // 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME
  32. // 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED
  33. // 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED
  34. // 4. Seamlessly merges data from all sources chronologically
  35. // 5. Provides complete real-time view of all messages in a topic
  36. type HybridMessageScanner struct {
  37. filerClient filer_pb.FilerClient
  38. brokerClient BrokerClientInterface // For querying unflushed data
  39. topic topic.Topic
  40. recordSchema *schema_pb.RecordType
  41. parquetLevels *schema.ParquetLevels
  42. engine *SQLEngine // Reference for system column formatting
  43. }
  44. // NewHybridMessageScanner creates a scanner that reads from all data sources
  45. // This provides complete real-time message coverage including unflushed data
  46. func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) {
  47. // Check if filerClient is available
  48. if filerClient == nil {
  49. return nil, fmt.Errorf("filerClient is required but not available")
  50. }
  51. // Create topic reference
  52. t := topic.Topic{
  53. Namespace: namespace,
  54. Name: topicName,
  55. }
  56. // Get topic schema from broker client (works with both real and mock clients)
  57. recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
  58. if err != nil {
  59. return nil, fmt.Errorf("failed to get topic schema: %v", err)
  60. }
  61. if recordType == nil {
  62. return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
  63. }
  64. // Create a copy of the recordType to avoid modifying the original
  65. recordTypeCopy := &schema_pb.RecordType{
  66. Fields: make([]*schema_pb.Field, len(recordType.Fields)),
  67. }
  68. copy(recordTypeCopy.Fields, recordType.Fields)
  69. // Add system columns that MQ adds to all records
  70. recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
  71. WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
  72. WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
  73. RecordTypeEnd()
  74. // Convert to Parquet levels for efficient reading
  75. parquetLevels, err := schema.ToParquetLevels(recordType)
  76. if err != nil {
  77. return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
  78. }
  79. return &HybridMessageScanner{
  80. filerClient: filerClient,
  81. brokerClient: brokerClient,
  82. topic: t,
  83. recordSchema: recordType,
  84. parquetLevels: parquetLevels,
  85. engine: engine,
  86. }, nil
  87. }
  88. // HybridScanOptions configure how the scanner reads from both live and archived data
  89. type HybridScanOptions struct {
  90. // Time range filtering (Unix nanoseconds)
  91. StartTimeNs int64
  92. StopTimeNs int64
  93. // Column projection - if empty, select all columns
  94. Columns []string
  95. // Row limit - 0 means no limit
  96. Limit int
  97. // Row offset - 0 means no offset
  98. Offset int
  99. // Predicate for WHERE clause filtering
  100. Predicate func(*schema_pb.RecordValue) bool
  101. }
  102. // HybridScanResult represents a message from either live logs or Parquet files
  103. type HybridScanResult struct {
  104. Values map[string]*schema_pb.Value // Column name -> value
  105. Timestamp int64 // Message timestamp (_ts_ns)
  106. Key []byte // Message key (_key)
  107. Source string // "live_log" or "parquet_archive" or "in_memory_broker"
  108. }
  109. // HybridScanStats contains statistics about data sources scanned
  110. type HybridScanStats struct {
  111. BrokerBufferQueried bool
  112. BrokerBufferMessages int
  113. BufferStartIndex int64
  114. PartitionsScanned int
  115. LiveLogFilesScanned int // Number of live log files processed
  116. }
  117. // ParquetColumnStats holds statistics for a single column from parquet metadata
  118. type ParquetColumnStats struct {
  119. ColumnName string
  120. MinValue *schema_pb.Value
  121. MaxValue *schema_pb.Value
  122. NullCount int64
  123. RowCount int64
  124. }
  125. // ParquetFileStats holds aggregated statistics for a parquet file
  126. type ParquetFileStats struct {
  127. FileName string
  128. RowCount int64
  129. ColumnStats map[string]*ParquetColumnStats
  130. // Optional file-level timestamp range from filer extended attributes
  131. MinTimestampNs int64
  132. MaxTimestampNs int64
  133. }
  134. // getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns
  135. func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) {
  136. if fileStats == nil {
  137. return 0, 0, false
  138. }
  139. // Prefer column stats for _ts_ns if present
  140. if len(fileStats.ColumnStats) > 0 {
  141. if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil {
  142. if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin {
  143. if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax {
  144. return minNs, maxNs, true
  145. }
  146. }
  147. }
  148. }
  149. // Fallback to file-level range if present in filer extended metadata
  150. if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 {
  151. return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true
  152. }
  153. return 0, 0, false
  154. }
  155. // schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns
  156. func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) {
  157. if v == nil {
  158. return 0, false
  159. }
  160. switch k := v.Kind.(type) {
  161. case *schema_pb.Value_Int64Value:
  162. return k.Int64Value, true
  163. case *schema_pb.Value_Int32Value:
  164. return int64(k.Int32Value), true
  165. default:
  166. return 0, false
  167. }
  168. }
  169. // StreamingDataSource provides a streaming interface for reading scan results
  170. type StreamingDataSource interface {
  171. Next() (*HybridScanResult, error) // Returns next result or nil when done
  172. HasMore() bool // Returns true if more data available
  173. Close() error // Clean up resources
  174. }
  175. // StreamingMergeItem represents an item in the priority queue for streaming merge
  176. type StreamingMergeItem struct {
  177. Result *HybridScanResult
  178. SourceID int
  179. DataSource StreamingDataSource
  180. }
  181. // StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp
  182. type StreamingMergeHeap []*StreamingMergeItem
  183. func (h StreamingMergeHeap) Len() int { return len(h) }
  184. func (h StreamingMergeHeap) Less(i, j int) bool {
  185. // Sort by timestamp (ascending order)
  186. return h[i].Result.Timestamp < h[j].Result.Timestamp
  187. }
  188. func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  189. func (h *StreamingMergeHeap) Push(x interface{}) {
  190. *h = append(*h, x.(*StreamingMergeItem))
  191. }
  192. func (h *StreamingMergeHeap) Pop() interface{} {
  193. old := *h
  194. n := len(old)
  195. item := old[n-1]
  196. *h = old[0 : n-1]
  197. return item
  198. }
  199. // Scan reads messages from both live logs and archived Parquet files
  200. // Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration
  201. // Assumptions:
  202. // 1. Chronologically merges live and archived data
  203. // 2. Applies filtering at the lowest level for efficiency
  204. // 3. Handles schema evolution transparently
  205. func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) {
  206. results, _, err := hms.ScanWithStats(ctx, options)
  207. return results, err
  208. }
  209. // ScanWithStats reads messages and returns scan statistics for execution plans
  210. func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
  211. var results []HybridScanResult
  212. stats := &HybridScanStats{}
  213. // Get all partitions for this topic via MQ broker discovery
  214. partitions, err := hms.discoverTopicPartitions(ctx)
  215. if err != nil {
  216. return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err)
  217. }
  218. stats.PartitionsScanned = len(partitions)
  219. for _, partition := range partitions {
  220. partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
  221. if err != nil {
  222. return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err)
  223. }
  224. results = append(results, partitionResults...)
  225. // Aggregate broker buffer stats
  226. if partitionStats != nil {
  227. if partitionStats.BrokerBufferQueried {
  228. stats.BrokerBufferQueried = true
  229. }
  230. stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages
  231. if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) {
  232. stats.BufferStartIndex = partitionStats.BufferStartIndex
  233. }
  234. }
  235. // Apply global limit (without offset) across all partitions
  236. // When OFFSET is used, collect more data to ensure we have enough after skipping
  237. // Note: OFFSET will be applied at the end to avoid double-application
  238. if options.Limit > 0 {
  239. // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
  240. minRequired := options.Limit + options.Offset
  241. // Small buffer only when needed to handle edge cases in distributed scanning
  242. if options.Offset > 0 && minRequired < 10 {
  243. minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
  244. }
  245. if len(results) >= minRequired {
  246. break
  247. }
  248. }
  249. }
  250. // Apply final OFFSET and LIMIT processing (done once at the end)
  251. // Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
  252. if options.Offset > 0 || options.Limit >= 0 {
  253. // Handle LIMIT 0 special case first
  254. if options.Limit == 0 {
  255. return []HybridScanResult{}, stats, nil
  256. }
  257. // Apply OFFSET first
  258. if options.Offset > 0 {
  259. if options.Offset >= len(results) {
  260. results = []HybridScanResult{}
  261. } else {
  262. results = results[options.Offset:]
  263. }
  264. }
  265. // Apply LIMIT after OFFSET (only if limit > 0)
  266. if options.Limit > 0 && len(results) > options.Limit {
  267. results = results[:options.Limit]
  268. }
  269. }
  270. return results, stats, nil
  271. }
  272. // scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
  273. func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
  274. results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
  275. return results, err
  276. }
  277. // scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics
  278. func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
  279. var results []HybridScanResult
  280. stats := &HybridScanStats{}
  281. // Skip if no broker client available
  282. if hms.brokerClient == nil {
  283. return results, stats, nil
  284. }
  285. // Mark that we attempted to query broker buffer
  286. stats.BrokerBufferQueried = true
  287. // Step 1: Get unflushed data from broker using buffer_start-based method
  288. // This method uses buffer_start metadata to avoid double-counting with exact precision
  289. unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
  290. if err != nil {
  291. // Log error but don't fail the query - continue with disk data only
  292. if isDebugMode(ctx) {
  293. fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
  294. }
  295. // Reset queried flag on error
  296. stats.BrokerBufferQueried = false
  297. return results, stats, nil
  298. }
  299. // Capture stats for EXPLAIN
  300. stats.BrokerBufferMessages = len(unflushedEntries)
  301. // Debug logging for EXPLAIN mode
  302. if isDebugMode(ctx) {
  303. fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries))
  304. if len(unflushedEntries) > 0 {
  305. fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n")
  306. }
  307. }
  308. // Step 2: Process unflushed entries (already deduplicated by broker)
  309. for _, logEntry := range unflushedEntries {
  310. // Skip control entries without actual data
  311. if hms.isControlEntry(logEntry) {
  312. continue // Skip this entry
  313. }
  314. // Skip messages outside time range
  315. if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs {
  316. continue
  317. }
  318. if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs {
  319. continue
  320. }
  321. // Convert LogEntry to RecordValue format (same as disk data)
  322. recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry)
  323. if err != nil {
  324. if isDebugMode(ctx) {
  325. fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err)
  326. }
  327. continue // Skip malformed messages
  328. }
  329. // Apply predicate filter if provided
  330. if options.Predicate != nil && !options.Predicate(recordValue) {
  331. continue
  332. }
  333. // Extract system columns for result
  334. timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
  335. key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
  336. // Apply column projection
  337. values := make(map[string]*schema_pb.Value)
  338. if len(options.Columns) == 0 {
  339. // Select all columns (excluding system columns from user view)
  340. for name, value := range recordValue.Fields {
  341. if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
  342. values[name] = value
  343. }
  344. }
  345. } else {
  346. // Select specified columns only
  347. for _, columnName := range options.Columns {
  348. if value, exists := recordValue.Fields[columnName]; exists {
  349. values[columnName] = value
  350. }
  351. }
  352. }
  353. // Create result with proper source tagging
  354. result := HybridScanResult{
  355. Values: values,
  356. Timestamp: timestamp,
  357. Key: key,
  358. Source: "live_log", // Data from broker's unflushed messages
  359. }
  360. results = append(results, result)
  361. // Apply limit (accounting for offset) - collect exact amount needed
  362. if options.Limit > 0 {
  363. // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
  364. minRequired := options.Limit + options.Offset
  365. // Small buffer only when needed to handle edge cases in message streaming
  366. if options.Offset > 0 && minRequired < 10 {
  367. minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
  368. }
  369. if len(results) >= minRequired {
  370. break
  371. }
  372. }
  373. }
  374. if isDebugMode(ctx) {
  375. fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
  376. }
  377. return results, stats, nil
  378. }
  379. // convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue
  380. func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
  381. // Parse the message data as RecordValue
  382. recordValue := &schema_pb.RecordValue{}
  383. if err := proto.Unmarshal(msg.Value, recordValue); err != nil {
  384. return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err)
  385. }
  386. // Add system columns
  387. if recordValue.Fields == nil {
  388. recordValue.Fields = make(map[string]*schema_pb.Value)
  389. }
  390. // Add timestamp
  391. recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
  392. Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs},
  393. }
  394. return recordValue, string(msg.Key), nil
  395. }
  396. // discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem
  397. // This finds real partition directories like v2025-09-01-07-16-34/0000-0630/
  398. func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) {
  399. if hms.filerClient == nil {
  400. return nil, fmt.Errorf("filerClient not available for partition discovery")
  401. }
  402. var allPartitions []topic.Partition
  403. var err error
  404. // Scan the topic directory for actual partition versions (timestamped directories)
  405. // List all version directories in the topic directory
  406. err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
  407. if !versionEntry.IsDirectory {
  408. return nil // Skip non-directories
  409. }
  410. // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
  411. versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name)
  412. if parseErr != nil {
  413. // Skip directories that don't match the version format
  414. return nil
  415. }
  416. // Scan partition directories within this version
  417. versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name)
  418. return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
  419. if !partitionEntry.IsDirectory {
  420. return nil // Skip non-directories
  421. }
  422. // Parse partition boundary from directory name (e.g., "0000-0630")
  423. rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
  424. if rangeStart == rangeStop {
  425. return nil // Skip invalid partition names
  426. }
  427. // Create partition object
  428. partition := topic.Partition{
  429. RangeStart: rangeStart,
  430. RangeStop: rangeStop,
  431. RingSize: topic.PartitionCount,
  432. UnixTimeNs: versionTime.UnixNano(),
  433. }
  434. allPartitions = append(allPartitions, partition)
  435. return nil
  436. })
  437. })
  438. if err != nil {
  439. return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err)
  440. }
  441. // If no partitions found, return empty slice (valid for newly created or empty topics)
  442. if len(allPartitions) == 0 {
  443. fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String())
  444. return []topic.Partition{}, nil
  445. }
  446. fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String())
  447. return allPartitions, nil
  448. }
  449. // scanPartitionHybrid scans a specific partition using the hybrid approach
  450. // This is where the magic happens - seamlessly reading ALL data sources:
  451. // 1. Unflushed in-memory data from brokers (REAL-TIME)
  452. // 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED)
  453. func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
  454. results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
  455. return results, err
  456. }
  457. // scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency
  458. // PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting
  459. // - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records
  460. // - Scalable: Can handle large topics without LIMIT clauses efficiently
  461. // - Streaming: Processes data as it arrives rather than buffering everything
  462. func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
  463. stats := &HybridScanStats{}
  464. // STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME)
  465. unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
  466. if err != nil {
  467. // Don't fail the query if broker scanning fails, but provide clear warning to user
  468. // This ensures users are aware that results may not include the most recent data
  469. if isDebugMode(ctx) {
  470. fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err)
  471. } else {
  472. fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
  473. fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
  474. }
  475. } else if unflushedStats != nil {
  476. stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
  477. stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
  478. stats.BufferStartIndex = unflushedStats.BufferStartIndex
  479. }
  480. // Count live log files for statistics
  481. liveLogCount, err := hms.countLiveLogFiles(partition)
  482. if err != nil {
  483. // Don't fail the query, just log warning
  484. fmt.Printf("Warning: Failed to count live log files: %v\n", err)
  485. liveLogCount = 0
  486. }
  487. stats.LiveLogFilesScanned = liveLogCount
  488. // STEP 2: Create streaming data sources for memory-efficient merge
  489. var dataSources []StreamingDataSource
  490. // Add unflushed data source (if we have unflushed results)
  491. if len(unflushedResults) > 0 {
  492. // Sort unflushed results by timestamp before creating stream
  493. if len(unflushedResults) > 1 {
  494. hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1)
  495. }
  496. dataSources = append(dataSources, NewSliceDataSource(unflushedResults))
  497. }
  498. // Add streaming flushed data source (live logs + Parquet files)
  499. flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options)
  500. dataSources = append(dataSources, flushedDataSource)
  501. // STEP 3: Use streaming merge for memory-efficient chronological ordering
  502. var results []HybridScanResult
  503. if len(dataSources) > 0 {
  504. // Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT)
  505. // For LIMIT N OFFSET M, we need to collect at least N+M rows
  506. scanLimit := options.Limit
  507. if options.Limit > 0 && options.Offset > 0 {
  508. scanLimit = options.Limit + options.Offset
  509. }
  510. mergedResults, err := hms.streamingMerge(dataSources, scanLimit)
  511. if err != nil {
  512. return nil, stats, fmt.Errorf("streaming merge failed: %v", err)
  513. }
  514. results = mergedResults
  515. }
  516. return results, stats, nil
  517. }
  518. // countLiveLogFiles counts the number of live log files in a partition for statistics
  519. func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) {
  520. partitionDir := topic.PartitionDir(hms.topic, partition)
  521. var fileCount int
  522. err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  523. // List all files in partition directory
  524. request := &filer_pb.ListEntriesRequest{
  525. Directory: partitionDir,
  526. Prefix: "",
  527. StartFromFileName: "",
  528. InclusiveStartFrom: true,
  529. Limit: 10000, // reasonable limit for counting
  530. }
  531. stream, err := client.ListEntries(context.Background(), request)
  532. if err != nil {
  533. return err
  534. }
  535. for {
  536. resp, err := stream.Recv()
  537. if err == io.EOF {
  538. break
  539. }
  540. if err != nil {
  541. return err
  542. }
  543. // Count files that are not .parquet files (live log files)
  544. // Live log files typically have timestamps or are named like log files
  545. fileName := resp.Entry.Name
  546. if !strings.HasSuffix(fileName, ".parquet") &&
  547. !strings.HasSuffix(fileName, ".offset") &&
  548. len(resp.Entry.Chunks) > 0 { // Has actual content
  549. fileCount++
  550. }
  551. }
  552. return nil
  553. })
  554. if err != nil {
  555. return 0, err
  556. }
  557. return fileCount, nil
  558. }
  559. // isControlEntry checks if a log entry is a control entry without actual data
  560. // Based on MQ system analysis, control entries are:
  561. // 1. DataMessages with populated Ctrl field (publisher close signals)
  562. // 2. Entries with empty keys (as filtered by subscriber)
  563. // 3. Entries with no data
  564. func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
  565. // Skip entries with no data
  566. if len(logEntry.Data) == 0 {
  567. return true
  568. }
  569. // Skip entries with empty keys (same logic as subscriber)
  570. if len(logEntry.Key) == 0 {
  571. return true
  572. }
  573. // Check if this is a DataMessage with control field populated
  574. dataMessage := &mq_pb.DataMessage{}
  575. if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
  576. // If it has a control field, it's a control message
  577. if dataMessage.Ctrl != nil {
  578. return true
  579. }
  580. }
  581. return false
  582. }
  583. // convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
  584. // This handles both:
  585. // 1. Live log entries (raw message format)
  586. // 2. Parquet entries (already in schema_pb.RecordValue format)
  587. func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
  588. // Try to unmarshal as RecordValue first (Parquet format)
  589. recordValue := &schema_pb.RecordValue{}
  590. if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
  591. // This is an archived message from Parquet files
  592. // FIX: Add system columns from LogEntry to RecordValue
  593. if recordValue.Fields == nil {
  594. recordValue.Fields = make(map[string]*schema_pb.Value)
  595. }
  596. // Add system columns from LogEntry
  597. recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
  598. Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
  599. }
  600. recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
  601. Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
  602. }
  603. return recordValue, "parquet_archive", nil
  604. }
  605. // If not a RecordValue, this is raw live message data - parse with schema
  606. return hms.parseRawMessageWithSchema(logEntry)
  607. }
  608. // parseRawMessageWithSchema parses raw live message data using the topic's schema
  609. // This provides proper type conversion and field mapping instead of treating everything as strings
  610. func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
  611. recordValue := &schema_pb.RecordValue{
  612. Fields: make(map[string]*schema_pb.Value),
  613. }
  614. // Add system columns (always present)
  615. recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
  616. Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
  617. }
  618. recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
  619. Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
  620. }
  621. // Parse message data based on schema
  622. if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
  623. // Fallback: No schema available, treat as single "data" field
  624. recordValue.Fields["data"] = &schema_pb.Value{
  625. Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
  626. }
  627. return recordValue, "live_log", nil
  628. }
  629. // Attempt schema-aware parsing
  630. // Strategy 1: Try JSON parsing first (most common for live messages)
  631. if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil {
  632. // Successfully parsed as JSON, merge with system columns
  633. for fieldName, fieldValue := range parsedRecord.Fields {
  634. recordValue.Fields[fieldName] = fieldValue
  635. }
  636. return recordValue, "live_log", nil
  637. }
  638. // Strategy 2: Try protobuf parsing (binary messages)
  639. if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil {
  640. // Successfully parsed as protobuf, merge with system columns
  641. for fieldName, fieldValue := range parsedRecord.Fields {
  642. recordValue.Fields[fieldName] = fieldValue
  643. }
  644. return recordValue, "live_log", nil
  645. }
  646. // Strategy 3: Fallback to single field with raw data
  647. // If schema has a single field, map the raw data to it with type conversion
  648. if len(hms.recordSchema.Fields) == 1 {
  649. field := hms.recordSchema.Fields[0]
  650. convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
  651. if err == nil {
  652. recordValue.Fields[field.Name] = convertedValue
  653. return recordValue, "live_log", nil
  654. }
  655. }
  656. // Final fallback: treat as string data field
  657. recordValue.Fields["data"] = &schema_pb.Value{
  658. Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
  659. }
  660. return recordValue, "live_log", nil
  661. }
  662. // parseJSONMessage attempts to parse raw data as JSON and map to schema fields
  663. func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
  664. // Try to parse as JSON
  665. var jsonData map[string]interface{}
  666. if err := json.Unmarshal(data, &jsonData); err != nil {
  667. return nil, fmt.Errorf("not valid JSON: %v", err)
  668. }
  669. recordValue := &schema_pb.RecordValue{
  670. Fields: make(map[string]*schema_pb.Value),
  671. }
  672. // Map JSON fields to schema fields
  673. for _, schemaField := range hms.recordSchema.Fields {
  674. fieldName := schemaField.Name
  675. if jsonValue, exists := jsonData[fieldName]; exists {
  676. schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type)
  677. if err != nil {
  678. // Log conversion error but continue with other fields
  679. continue
  680. }
  681. recordValue.Fields[fieldName] = schemaValue
  682. }
  683. }
  684. return recordValue, nil
  685. }
  686. // parseProtobufMessage attempts to parse raw data as protobuf RecordValue
  687. func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) {
  688. // This might be a raw protobuf message that didn't parse correctly the first time
  689. // Try alternative protobuf unmarshaling approaches
  690. recordValue := &schema_pb.RecordValue{}
  691. // Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue)
  692. if err := proto.Unmarshal(data, recordValue); err == nil {
  693. return recordValue, nil
  694. }
  695. // Strategy 2: Check if it's a different protobuf message type
  696. // For now, return error as we need more specific knowledge of MQ message formats
  697. return nil, fmt.Errorf("could not parse as protobuf RecordValue")
  698. }
  699. // convertRawDataToSchemaValue converts raw bytes to a specific schema type
  700. func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
  701. dataStr := string(data)
  702. switch fieldType.Kind.(type) {
  703. case *schema_pb.Type_ScalarType:
  704. scalarType := fieldType.GetScalarType()
  705. switch scalarType {
  706. case schema_pb.ScalarType_STRING:
  707. return &schema_pb.Value{
  708. Kind: &schema_pb.Value_StringValue{StringValue: dataStr},
  709. }, nil
  710. case schema_pb.ScalarType_INT32:
  711. if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil {
  712. return &schema_pb.Value{
  713. Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)},
  714. }, nil
  715. }
  716. case schema_pb.ScalarType_INT64:
  717. if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil {
  718. return &schema_pb.Value{
  719. Kind: &schema_pb.Value_Int64Value{Int64Value: val},
  720. }, nil
  721. }
  722. case schema_pb.ScalarType_FLOAT:
  723. if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil {
  724. return &schema_pb.Value{
  725. Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)},
  726. }, nil
  727. }
  728. case schema_pb.ScalarType_DOUBLE:
  729. if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil {
  730. return &schema_pb.Value{
  731. Kind: &schema_pb.Value_DoubleValue{DoubleValue: val},
  732. }, nil
  733. }
  734. case schema_pb.ScalarType_BOOL:
  735. lowerStr := strings.ToLower(strings.TrimSpace(dataStr))
  736. if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" {
  737. return &schema_pb.Value{
  738. Kind: &schema_pb.Value_BoolValue{BoolValue: true},
  739. }, nil
  740. } else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" {
  741. return &schema_pb.Value{
  742. Kind: &schema_pb.Value_BoolValue{BoolValue: false},
  743. }, nil
  744. }
  745. case schema_pb.ScalarType_BYTES:
  746. return &schema_pb.Value{
  747. Kind: &schema_pb.Value_BytesValue{BytesValue: data},
  748. }, nil
  749. }
  750. }
  751. return nil, fmt.Errorf("unsupported type conversion for %v", fieldType)
  752. }
  753. // convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type
  754. func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
  755. switch fieldType.Kind.(type) {
  756. case *schema_pb.Type_ScalarType:
  757. scalarType := fieldType.GetScalarType()
  758. switch scalarType {
  759. case schema_pb.ScalarType_STRING:
  760. if str, ok := jsonValue.(string); ok {
  761. return &schema_pb.Value{
  762. Kind: &schema_pb.Value_StringValue{StringValue: str},
  763. }, nil
  764. }
  765. // Convert other types to string
  766. return &schema_pb.Value{
  767. Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)},
  768. }, nil
  769. case schema_pb.ScalarType_INT32:
  770. if num, ok := jsonValue.(float64); ok { // JSON numbers are float64
  771. return &schema_pb.Value{
  772. Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)},
  773. }, nil
  774. }
  775. case schema_pb.ScalarType_INT64:
  776. if num, ok := jsonValue.(float64); ok {
  777. return &schema_pb.Value{
  778. Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)},
  779. }, nil
  780. }
  781. case schema_pb.ScalarType_FLOAT:
  782. if num, ok := jsonValue.(float64); ok {
  783. return &schema_pb.Value{
  784. Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)},
  785. }, nil
  786. }
  787. case schema_pb.ScalarType_DOUBLE:
  788. if num, ok := jsonValue.(float64); ok {
  789. return &schema_pb.Value{
  790. Kind: &schema_pb.Value_DoubleValue{DoubleValue: num},
  791. }, nil
  792. }
  793. case schema_pb.ScalarType_BOOL:
  794. if boolVal, ok := jsonValue.(bool); ok {
  795. return &schema_pb.Value{
  796. Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
  797. }, nil
  798. }
  799. case schema_pb.ScalarType_BYTES:
  800. if str, ok := jsonValue.(string); ok {
  801. return &schema_pb.Value{
  802. Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)},
  803. }, nil
  804. }
  805. }
  806. }
  807. return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType)
  808. }
  809. // ConvertToSQLResult converts HybridScanResults to SQL query results
  810. func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult {
  811. if len(results) == 0 {
  812. return &QueryResult{
  813. Columns: columns,
  814. Rows: [][]sqltypes.Value{},
  815. Database: hms.topic.Namespace,
  816. Table: hms.topic.Name,
  817. }
  818. }
  819. // Determine columns if not specified
  820. if len(columns) == 0 {
  821. columnSet := make(map[string]bool)
  822. for _, result := range results {
  823. for columnName := range result.Values {
  824. columnSet[columnName] = true
  825. }
  826. }
  827. columns = make([]string, 0, len(columnSet))
  828. for columnName := range columnSet {
  829. columns = append(columns, columnName)
  830. }
  831. }
  832. // Convert to SQL rows
  833. rows := make([][]sqltypes.Value, len(results))
  834. for i, result := range results {
  835. row := make([]sqltypes.Value, len(columns))
  836. for j, columnName := range columns {
  837. switch columnName {
  838. case SW_COLUMN_NAME_SOURCE:
  839. row[j] = sqltypes.NewVarChar(result.Source)
  840. case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP:
  841. // Format timestamp as proper timestamp type instead of raw nanoseconds
  842. row[j] = hms.engine.formatTimestampColumn(result.Timestamp)
  843. case SW_COLUMN_NAME_KEY:
  844. row[j] = sqltypes.NewVarBinary(string(result.Key))
  845. default:
  846. if value, exists := result.Values[columnName]; exists {
  847. row[j] = convertSchemaValueToSQL(value)
  848. } else {
  849. row[j] = sqltypes.NULL
  850. }
  851. }
  852. }
  853. rows[i] = row
  854. }
  855. return &QueryResult{
  856. Columns: columns,
  857. Rows: rows,
  858. Database: hms.topic.Namespace,
  859. Table: hms.topic.Name,
  860. }
  861. }
  862. // ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries
  863. // Combines auto-discovered columns (from *) with explicitly requested columns
  864. func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult {
  865. if len(results) == 0 {
  866. // For empty results, combine auto-discovered columns with explicit ones
  867. columnSet := make(map[string]bool)
  868. // Add explicit columns first
  869. for _, col := range explicitColumns {
  870. columnSet[col] = true
  871. }
  872. // Build final column list
  873. columns := make([]string, 0, len(columnSet))
  874. for col := range columnSet {
  875. columns = append(columns, col)
  876. }
  877. return &QueryResult{
  878. Columns: columns,
  879. Rows: [][]sqltypes.Value{},
  880. Database: hms.topic.Namespace,
  881. Table: hms.topic.Name,
  882. }
  883. }
  884. // Auto-discover columns from data (like SELECT *)
  885. autoColumns := make(map[string]bool)
  886. for _, result := range results {
  887. for columnName := range result.Values {
  888. autoColumns[columnName] = true
  889. }
  890. }
  891. // Combine auto-discovered and explicit columns
  892. columnSet := make(map[string]bool)
  893. // Add auto-discovered columns first (regular data columns)
  894. for col := range autoColumns {
  895. columnSet[col] = true
  896. }
  897. // Add explicit columns (may include system columns like _source)
  898. for _, col := range explicitColumns {
  899. columnSet[col] = true
  900. }
  901. // Build final column list
  902. columns := make([]string, 0, len(columnSet))
  903. for col := range columnSet {
  904. columns = append(columns, col)
  905. }
  906. // Convert to SQL rows
  907. rows := make([][]sqltypes.Value, len(results))
  908. for i, result := range results {
  909. row := make([]sqltypes.Value, len(columns))
  910. for j, columnName := range columns {
  911. switch columnName {
  912. case SW_COLUMN_NAME_TIMESTAMP:
  913. row[j] = sqltypes.NewInt64(result.Timestamp)
  914. case SW_COLUMN_NAME_KEY:
  915. row[j] = sqltypes.NewVarBinary(string(result.Key))
  916. case SW_COLUMN_NAME_SOURCE:
  917. row[j] = sqltypes.NewVarChar(result.Source)
  918. default:
  919. // Regular data column
  920. if value, exists := result.Values[columnName]; exists {
  921. row[j] = convertSchemaValueToSQL(value)
  922. } else {
  923. row[j] = sqltypes.NULL
  924. }
  925. }
  926. }
  927. rows[i] = row
  928. }
  929. return &QueryResult{
  930. Columns: columns,
  931. Rows: rows,
  932. Database: hms.topic.Namespace,
  933. Table: hms.topic.Name,
  934. }
  935. }
  936. // ReadParquetStatistics efficiently reads column statistics from parquet files
  937. // without scanning the full file content - uses parquet's built-in metadata
  938. func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
  939. var fileStats []*ParquetFileStats
  940. // Use the same chunk cache as the logstore package
  941. chunkCache := chunk_cache.NewChunkCacheInMemory(256)
  942. lookupFileIdFn := filer.LookupFn(h.filerClient)
  943. err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
  944. // Only process parquet files
  945. if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
  946. return nil
  947. }
  948. // Extract statistics from this parquet file
  949. stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache)
  950. if err != nil {
  951. // Log error but continue processing other files
  952. fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err)
  953. return nil
  954. }
  955. if stats != nil {
  956. fileStats = append(fileStats, stats)
  957. }
  958. return nil
  959. })
  960. return fileStats, err
  961. }
  962. // extractParquetFileStats extracts column statistics from a single parquet file
  963. func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) {
  964. // Create reader for the parquet file
  965. fileSize := filer.FileSize(entry)
  966. visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
  967. chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
  968. readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
  969. readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
  970. // Create parquet reader - this only reads metadata, not data
  971. parquetReader := parquet.NewReader(readerAt)
  972. defer parquetReader.Close()
  973. fileView := parquetReader.File()
  974. fileStats := &ParquetFileStats{
  975. FileName: entry.Name,
  976. RowCount: fileView.NumRows(),
  977. ColumnStats: make(map[string]*ParquetColumnStats),
  978. }
  979. // Populate optional min/max from filer extended attributes (writer stores ns timestamps)
  980. if entry != nil && entry.Extended != nil {
  981. if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 {
  982. fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
  983. }
  984. if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 {
  985. fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
  986. }
  987. }
  988. // Get schema information
  989. schema := fileView.Schema()
  990. // Process each row group
  991. rowGroups := fileView.RowGroups()
  992. for _, rowGroup := range rowGroups {
  993. columnChunks := rowGroup.ColumnChunks()
  994. // Process each column chunk
  995. for i, chunk := range columnChunks {
  996. // Get column name from schema
  997. columnName := h.getColumnNameFromSchema(schema, i)
  998. if columnName == "" {
  999. continue
  1000. }
  1001. // Try to get column statistics
  1002. columnIndex, err := chunk.ColumnIndex()
  1003. if err != nil {
  1004. // No column index available - skip this column
  1005. continue
  1006. }
  1007. // Extract min/max values from the first page (for simplicity)
  1008. // In a more sophisticated implementation, we could aggregate across all pages
  1009. numPages := columnIndex.NumPages()
  1010. if numPages == 0 {
  1011. continue
  1012. }
  1013. minParquetValue := columnIndex.MinValue(0)
  1014. maxParquetValue := columnIndex.MaxValue(numPages - 1)
  1015. nullCount := int64(0)
  1016. // Aggregate null counts across all pages
  1017. for pageIdx := 0; pageIdx < numPages; pageIdx++ {
  1018. nullCount += columnIndex.NullCount(pageIdx)
  1019. }
  1020. // Convert parquet values to schema_pb.Value
  1021. minValue, err := h.convertParquetValueToSchemaValue(minParquetValue)
  1022. if err != nil {
  1023. continue
  1024. }
  1025. maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue)
  1026. if err != nil {
  1027. continue
  1028. }
  1029. // Store column statistics (aggregate across row groups if column already exists)
  1030. if existingStats, exists := fileStats.ColumnStats[columnName]; exists {
  1031. // Update existing statistics
  1032. if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 {
  1033. existingStats.MinValue = minValue
  1034. }
  1035. if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 {
  1036. existingStats.MaxValue = maxValue
  1037. }
  1038. existingStats.NullCount += nullCount
  1039. } else {
  1040. // Create new column statistics
  1041. fileStats.ColumnStats[columnName] = &ParquetColumnStats{
  1042. ColumnName: columnName,
  1043. MinValue: minValue,
  1044. MaxValue: maxValue,
  1045. NullCount: nullCount,
  1046. RowCount: rowGroup.NumRows(),
  1047. }
  1048. }
  1049. }
  1050. }
  1051. return fileStats, nil
  1052. }
  1053. // getColumnNameFromSchema extracts column name from parquet schema by index
  1054. func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string {
  1055. // Get the leaf columns in order
  1056. var columnNames []string
  1057. h.collectColumnNames(schema.Fields(), &columnNames)
  1058. if columnIndex >= 0 && columnIndex < len(columnNames) {
  1059. return columnNames[columnIndex]
  1060. }
  1061. return ""
  1062. }
  1063. // collectColumnNames recursively collects leaf column names from schema
  1064. func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) {
  1065. for _, field := range fields {
  1066. if len(field.Fields()) == 0 {
  1067. // This is a leaf field (no sub-fields)
  1068. *names = append(*names, field.Name())
  1069. } else {
  1070. // This is a group - recurse
  1071. h.collectColumnNames(field.Fields(), names)
  1072. }
  1073. }
  1074. }
  1075. // convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value
  1076. func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) {
  1077. switch pv.Kind() {
  1078. case parquet.Boolean:
  1079. return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil
  1080. case parquet.Int32:
  1081. return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil
  1082. case parquet.Int64:
  1083. return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil
  1084. case parquet.Float:
  1085. return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil
  1086. case parquet.Double:
  1087. return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil
  1088. case parquet.ByteArray:
  1089. return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil
  1090. default:
  1091. return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind())
  1092. }
  1093. }
  1094. // compareSchemaValues compares two schema_pb.Value objects
  1095. func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int {
  1096. if v1 == nil && v2 == nil {
  1097. return 0
  1098. }
  1099. if v1 == nil {
  1100. return -1
  1101. }
  1102. if v2 == nil {
  1103. return 1
  1104. }
  1105. // Extract raw values and compare
  1106. raw1 := h.extractRawValueFromSchema(v1)
  1107. raw2 := h.extractRawValueFromSchema(v2)
  1108. return h.compareRawValues(raw1, raw2)
  1109. }
  1110. // extractRawValueFromSchema extracts the raw value from schema_pb.Value
  1111. func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} {
  1112. switch v := value.Kind.(type) {
  1113. case *schema_pb.Value_BoolValue:
  1114. return v.BoolValue
  1115. case *schema_pb.Value_Int32Value:
  1116. return v.Int32Value
  1117. case *schema_pb.Value_Int64Value:
  1118. return v.Int64Value
  1119. case *schema_pb.Value_FloatValue:
  1120. return v.FloatValue
  1121. case *schema_pb.Value_DoubleValue:
  1122. return v.DoubleValue
  1123. case *schema_pb.Value_BytesValue:
  1124. return string(v.BytesValue) // Convert to string for comparison
  1125. case *schema_pb.Value_StringValue:
  1126. return v.StringValue
  1127. }
  1128. return nil
  1129. }
  1130. // compareRawValues compares two raw values
  1131. func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int {
  1132. // Handle nil cases
  1133. if v1 == nil && v2 == nil {
  1134. return 0
  1135. }
  1136. if v1 == nil {
  1137. return -1
  1138. }
  1139. if v2 == nil {
  1140. return 1
  1141. }
  1142. // Compare based on type
  1143. switch val1 := v1.(type) {
  1144. case bool:
  1145. if val2, ok := v2.(bool); ok {
  1146. if val1 == val2 {
  1147. return 0
  1148. }
  1149. if val1 {
  1150. return 1
  1151. }
  1152. return -1
  1153. }
  1154. case int32:
  1155. if val2, ok := v2.(int32); ok {
  1156. if val1 < val2 {
  1157. return -1
  1158. } else if val1 > val2 {
  1159. return 1
  1160. }
  1161. return 0
  1162. }
  1163. case int64:
  1164. if val2, ok := v2.(int64); ok {
  1165. if val1 < val2 {
  1166. return -1
  1167. } else if val1 > val2 {
  1168. return 1
  1169. }
  1170. return 0
  1171. }
  1172. case float32:
  1173. if val2, ok := v2.(float32); ok {
  1174. if val1 < val2 {
  1175. return -1
  1176. } else if val1 > val2 {
  1177. return 1
  1178. }
  1179. return 0
  1180. }
  1181. case float64:
  1182. if val2, ok := v2.(float64); ok {
  1183. if val1 < val2 {
  1184. return -1
  1185. } else if val1 > val2 {
  1186. return 1
  1187. }
  1188. return 0
  1189. }
  1190. case string:
  1191. if val2, ok := v2.(string); ok {
  1192. if val1 < val2 {
  1193. return -1
  1194. } else if val1 > val2 {
  1195. return 1
  1196. }
  1197. return 0
  1198. }
  1199. }
  1200. // Default: try string comparison
  1201. str1 := fmt.Sprintf("%v", v1)
  1202. str2 := fmt.Sprintf("%v", v2)
  1203. if str1 < str2 {
  1204. return -1
  1205. } else if str1 > str2 {
  1206. return 1
  1207. }
  1208. return 0
  1209. }
  1210. // streamingMerge merges multiple sorted data sources using a heap-based approach
  1211. // This provides memory-efficient merging without loading all data into memory
  1212. func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) {
  1213. if len(dataSources) == 0 {
  1214. return nil, nil
  1215. }
  1216. var results []HybridScanResult
  1217. mergeHeap := &StreamingMergeHeap{}
  1218. heap.Init(mergeHeap)
  1219. // Initialize heap with first item from each data source
  1220. for i, source := range dataSources {
  1221. if source.HasMore() {
  1222. result, err := source.Next()
  1223. if err != nil {
  1224. // Close all sources and return error
  1225. for _, s := range dataSources {
  1226. s.Close()
  1227. }
  1228. return nil, fmt.Errorf("failed to read from data source %d: %v", i, err)
  1229. }
  1230. if result != nil {
  1231. heap.Push(mergeHeap, &StreamingMergeItem{
  1232. Result: result,
  1233. SourceID: i,
  1234. DataSource: source,
  1235. })
  1236. }
  1237. }
  1238. }
  1239. // Process results in chronological order
  1240. for mergeHeap.Len() > 0 {
  1241. // Get next chronologically ordered result
  1242. item := heap.Pop(mergeHeap).(*StreamingMergeItem)
  1243. results = append(results, *item.Result)
  1244. // Check limit
  1245. if limit > 0 && len(results) >= limit {
  1246. break
  1247. }
  1248. // Try to get next item from the same data source
  1249. if item.DataSource.HasMore() {
  1250. nextResult, err := item.DataSource.Next()
  1251. if err != nil {
  1252. // Log error but continue with other sources
  1253. fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err)
  1254. } else if nextResult != nil {
  1255. heap.Push(mergeHeap, &StreamingMergeItem{
  1256. Result: nextResult,
  1257. SourceID: item.SourceID,
  1258. DataSource: item.DataSource,
  1259. })
  1260. }
  1261. }
  1262. }
  1263. // Close all data sources
  1264. for _, source := range dataSources {
  1265. source.Close()
  1266. }
  1267. return results, nil
  1268. }
  1269. // SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource
  1270. // This is used for unflushed data that is already loaded into memory
  1271. type SliceDataSource struct {
  1272. results []HybridScanResult
  1273. index int
  1274. }
  1275. func NewSliceDataSource(results []HybridScanResult) *SliceDataSource {
  1276. return &SliceDataSource{
  1277. results: results,
  1278. index: 0,
  1279. }
  1280. }
  1281. func (s *SliceDataSource) Next() (*HybridScanResult, error) {
  1282. if s.index >= len(s.results) {
  1283. return nil, nil
  1284. }
  1285. result := &s.results[s.index]
  1286. s.index++
  1287. return result, nil
  1288. }
  1289. func (s *SliceDataSource) HasMore() bool {
  1290. return s.index < len(s.results)
  1291. }
  1292. func (s *SliceDataSource) Close() error {
  1293. return nil // Nothing to clean up for slice-based source
  1294. }
  1295. // StreamingFlushedDataSource provides streaming access to flushed data
  1296. type StreamingFlushedDataSource struct {
  1297. hms *HybridMessageScanner
  1298. partition topic.Partition
  1299. options HybridScanOptions
  1300. mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error)
  1301. resultChan chan *HybridScanResult
  1302. errorChan chan error
  1303. doneChan chan struct{}
  1304. started bool
  1305. finished bool
  1306. closed int32 // atomic flag to prevent double close
  1307. mu sync.RWMutex
  1308. }
  1309. func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource {
  1310. mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition)
  1311. return &StreamingFlushedDataSource{
  1312. hms: hms,
  1313. partition: partition,
  1314. options: options,
  1315. mergedReadFn: mergedReadFn,
  1316. resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance
  1317. errorChan: make(chan error, 1),
  1318. doneChan: make(chan struct{}),
  1319. started: false,
  1320. finished: false,
  1321. }
  1322. }
  1323. func (s *StreamingFlushedDataSource) startStreaming() {
  1324. if s.started {
  1325. return
  1326. }
  1327. s.started = true
  1328. go func() {
  1329. defer func() {
  1330. // Use atomic flag to ensure channels are only closed once
  1331. if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
  1332. close(s.resultChan)
  1333. close(s.errorChan)
  1334. close(s.doneChan)
  1335. }
  1336. }()
  1337. // Set up time range for scanning
  1338. startTime := time.Unix(0, s.options.StartTimeNs)
  1339. if s.options.StartTimeNs == 0 {
  1340. startTime = time.Unix(0, 0)
  1341. }
  1342. stopTsNs := s.options.StopTimeNs
  1343. // For SQL queries, stopTsNs = 0 means "no stop time restriction"
  1344. // This is different from message queue consumers which want to stop at "now"
  1345. // We detect SQL context by checking if we have a predicate function
  1346. if stopTsNs == 0 && s.options.Predicate == nil {
  1347. // Only set to current time for non-SQL queries (message queue consumers)
  1348. stopTsNs = time.Now().UnixNano()
  1349. }
  1350. // If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning
  1351. // Message processing function
  1352. eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
  1353. // Skip control entries without actual data
  1354. if s.hms.isControlEntry(logEntry) {
  1355. return false, nil // Skip this entry
  1356. }
  1357. // Convert log entry to schema_pb.RecordValue for consistent processing
  1358. recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry)
  1359. if convertErr != nil {
  1360. return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
  1361. }
  1362. // Apply predicate filtering (WHERE clause)
  1363. if s.options.Predicate != nil && !s.options.Predicate(recordValue) {
  1364. return false, nil // Skip this message
  1365. }
  1366. // Extract system columns
  1367. timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
  1368. key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
  1369. // Apply column projection
  1370. values := make(map[string]*schema_pb.Value)
  1371. if len(s.options.Columns) == 0 {
  1372. // Select all columns (excluding system columns from user view)
  1373. for name, value := range recordValue.Fields {
  1374. if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
  1375. values[name] = value
  1376. }
  1377. }
  1378. } else {
  1379. // Select specified columns only
  1380. for _, columnName := range s.options.Columns {
  1381. if value, exists := recordValue.Fields[columnName]; exists {
  1382. values[columnName] = value
  1383. }
  1384. }
  1385. }
  1386. result := &HybridScanResult{
  1387. Values: values,
  1388. Timestamp: timestamp,
  1389. Key: key,
  1390. Source: source,
  1391. }
  1392. // Check if already closed before trying to send
  1393. if atomic.LoadInt32(&s.closed) != 0 {
  1394. return true, nil // Stop processing if closed
  1395. }
  1396. // Send result to channel with proper handling of closed channels
  1397. select {
  1398. case s.resultChan <- result:
  1399. return false, nil
  1400. case <-s.doneChan:
  1401. return true, nil // Stop processing if closed
  1402. default:
  1403. // Check again if closed (in case it was closed between the atomic check and select)
  1404. if atomic.LoadInt32(&s.closed) != 0 {
  1405. return true, nil
  1406. }
  1407. // If not closed, try sending again with blocking select
  1408. select {
  1409. case s.resultChan <- result:
  1410. return false, nil
  1411. case <-s.doneChan:
  1412. return true, nil
  1413. }
  1414. }
  1415. }
  1416. // Start scanning from the specified position
  1417. startPosition := log_buffer.MessagePosition{Time: startTime}
  1418. _, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn)
  1419. if err != nil {
  1420. // Only try to send error if not already closed
  1421. if atomic.LoadInt32(&s.closed) == 0 {
  1422. select {
  1423. case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err):
  1424. case <-s.doneChan:
  1425. default:
  1426. // Channel might be full or closed, ignore
  1427. }
  1428. }
  1429. }
  1430. s.finished = true
  1431. }()
  1432. }
  1433. func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) {
  1434. if !s.started {
  1435. s.startStreaming()
  1436. }
  1437. select {
  1438. case result, ok := <-s.resultChan:
  1439. if !ok {
  1440. return nil, nil // No more results
  1441. }
  1442. return result, nil
  1443. case err := <-s.errorChan:
  1444. return nil, err
  1445. case <-s.doneChan:
  1446. return nil, nil
  1447. }
  1448. }
  1449. func (s *StreamingFlushedDataSource) HasMore() bool {
  1450. if !s.started {
  1451. return true // Haven't started yet, so potentially has data
  1452. }
  1453. return !s.finished || len(s.resultChan) > 0
  1454. }
  1455. func (s *StreamingFlushedDataSource) Close() error {
  1456. // Use atomic flag to ensure channels are only closed once
  1457. if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
  1458. close(s.doneChan)
  1459. close(s.resultChan)
  1460. close(s.errorChan)
  1461. }
  1462. return nil
  1463. }
  1464. // mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm
  1465. func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) {
  1466. if left < right {
  1467. mid := left + (right-left)/2
  1468. // Recursively sort both halves
  1469. hms.mergeSort(results, left, mid)
  1470. hms.mergeSort(results, mid+1, right)
  1471. // Merge the sorted halves
  1472. hms.merge(results, left, mid, right)
  1473. }
  1474. }
  1475. // merge combines two sorted subarrays into a single sorted array
  1476. func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) {
  1477. // Create temporary arrays for the two subarrays
  1478. leftArray := make([]HybridScanResult, mid-left+1)
  1479. rightArray := make([]HybridScanResult, right-mid)
  1480. // Copy data to temporary arrays
  1481. copy(leftArray, results[left:mid+1])
  1482. copy(rightArray, results[mid+1:right+1])
  1483. // Merge the temporary arrays back into results[left..right]
  1484. i, j, k := 0, 0, left
  1485. for i < len(leftArray) && j < len(rightArray) {
  1486. if leftArray[i].Timestamp <= rightArray[j].Timestamp {
  1487. results[k] = leftArray[i]
  1488. i++
  1489. } else {
  1490. results[k] = rightArray[j]
  1491. j++
  1492. }
  1493. k++
  1494. }
  1495. // Copy remaining elements of leftArray, if any
  1496. for i < len(leftArray) {
  1497. results[k] = leftArray[i]
  1498. i++
  1499. k++
  1500. }
  1501. // Copy remaining elements of rightArray, if any
  1502. for j < len(rightArray) {
  1503. results[k] = rightArray[j]
  1504. j++
  1505. k++
  1506. }
  1507. }