aggregations.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933
  1. package engine
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "strconv"
  7. "strings"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
  11. )
  12. // AggregationSpec defines an aggregation function to be computed
  13. type AggregationSpec struct {
  14. Function string // COUNT, SUM, AVG, MIN, MAX
  15. Column string // Column name, or "*" for COUNT(*)
  16. Alias string // Optional alias for the result column
  17. Distinct bool // Support for DISTINCT keyword
  18. }
  19. // AggregationResult holds the computed result of an aggregation
  20. type AggregationResult struct {
  21. Count int64
  22. Sum float64
  23. Min interface{}
  24. Max interface{}
  25. }
  26. // AggregationStrategy represents the strategy for executing aggregations
  27. type AggregationStrategy struct {
  28. CanUseFastPath bool
  29. Reason string
  30. UnsupportedSpecs []AggregationSpec
  31. }
  32. // TopicDataSources represents the data sources available for a topic
  33. type TopicDataSources struct {
  34. ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats
  35. ParquetRowCount int64
  36. LiveLogRowCount int64
  37. LiveLogFilesCount int // Total count of live log files across all partitions
  38. PartitionsCount int
  39. BrokerUnflushedCount int64
  40. }
  41. // FastPathOptimizer handles fast path aggregation optimization decisions
  42. type FastPathOptimizer struct {
  43. engine *SQLEngine
  44. }
  45. // NewFastPathOptimizer creates a new fast path optimizer
  46. func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer {
  47. return &FastPathOptimizer{engine: engine}
  48. }
  49. // DetermineStrategy analyzes aggregations and determines if fast path can be used
  50. func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy {
  51. strategy := AggregationStrategy{
  52. CanUseFastPath: true,
  53. Reason: "all_aggregations_supported",
  54. UnsupportedSpecs: []AggregationSpec{},
  55. }
  56. for _, spec := range aggregations {
  57. if !opt.engine.canUseParquetStatsForAggregation(spec) {
  58. strategy.CanUseFastPath = false
  59. strategy.Reason = "unsupported_aggregation_functions"
  60. strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec)
  61. }
  62. }
  63. return strategy
  64. }
  65. // CollectDataSources gathers information about available data sources for a topic
  66. func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) {
  67. return opt.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, 0, 0)
  68. }
  69. // CollectDataSourcesWithTimeFilter gathers information about available data sources for a topic
  70. // with optional time filtering to skip irrelevant parquet files
  71. func (opt *FastPathOptimizer) CollectDataSourcesWithTimeFilter(ctx context.Context, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) (*TopicDataSources, error) {
  72. dataSources := &TopicDataSources{
  73. ParquetFiles: make(map[string][]*ParquetFileStats),
  74. ParquetRowCount: 0,
  75. LiveLogRowCount: 0,
  76. LiveLogFilesCount: 0,
  77. PartitionsCount: 0,
  78. }
  79. if isDebugMode(ctx) {
  80. fmt.Printf("Collecting data sources for: %s/%s\n", hybridScanner.topic.Namespace, hybridScanner.topic.Name)
  81. }
  82. // Discover partitions for the topic
  83. partitionPaths, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
  84. if err != nil {
  85. if isDebugMode(ctx) {
  86. fmt.Printf("ERROR: Partition discovery failed: %v\n", err)
  87. }
  88. return dataSources, DataSourceError{
  89. Source: "partition_discovery",
  90. Cause: err,
  91. }
  92. }
  93. // DEBUG: Log discovered partitions
  94. if isDebugMode(ctx) {
  95. fmt.Printf("Discovered %d partitions: %v\n", len(partitionPaths), partitionPaths)
  96. }
  97. // Collect stats from each partition
  98. // Note: discoverTopicPartitions always returns absolute paths starting with "/topics/"
  99. for _, partitionPath := range partitionPaths {
  100. if isDebugMode(ctx) {
  101. fmt.Printf("\nProcessing partition: %s\n", partitionPath)
  102. }
  103. // Read parquet file statistics
  104. parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath)
  105. if err != nil {
  106. if isDebugMode(ctx) {
  107. fmt.Printf(" ERROR: Failed to read parquet statistics: %v\n", err)
  108. }
  109. } else if len(parquetStats) == 0 {
  110. if isDebugMode(ctx) {
  111. fmt.Printf(" No parquet files found in partition\n")
  112. }
  113. } else {
  114. // Prune by time range using parquet column statistics
  115. filtered := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
  116. dataSources.ParquetFiles[partitionPath] = filtered
  117. partitionParquetRows := int64(0)
  118. for _, stat := range filtered {
  119. partitionParquetRows += stat.RowCount
  120. dataSources.ParquetRowCount += stat.RowCount
  121. }
  122. if isDebugMode(ctx) {
  123. fmt.Printf(" Found %d parquet files with %d total rows\n", len(filtered), partitionParquetRows)
  124. }
  125. }
  126. // Count live log files (excluding those converted to parquet)
  127. parquetSources := opt.engine.extractParquetSourceFiles(dataSources.ParquetFiles[partitionPath])
  128. liveLogCount, liveLogErr := opt.engine.countLiveLogRowsExcludingParquetSources(ctx, partitionPath, parquetSources)
  129. if liveLogErr != nil {
  130. if isDebugMode(ctx) {
  131. fmt.Printf(" ERROR: Failed to count live log rows: %v\n", liveLogErr)
  132. }
  133. } else {
  134. dataSources.LiveLogRowCount += liveLogCount
  135. if isDebugMode(ctx) {
  136. fmt.Printf(" Found %d live log rows (excluding %d parquet sources)\n", liveLogCount, len(parquetSources))
  137. }
  138. }
  139. // Count live log files for partition with proper range values
  140. // Extract partition name from absolute path (e.g., "0000-2520" from "/topics/.../v2025.../0000-2520")
  141. partitionName := partitionPath[strings.LastIndex(partitionPath, "/")+1:]
  142. partitionParts := strings.Split(partitionName, "-")
  143. if len(partitionParts) == 2 {
  144. rangeStart, err1 := strconv.Atoi(partitionParts[0])
  145. rangeStop, err2 := strconv.Atoi(partitionParts[1])
  146. if err1 == nil && err2 == nil {
  147. partition := topic.Partition{
  148. RangeStart: int32(rangeStart),
  149. RangeStop: int32(rangeStop),
  150. }
  151. liveLogFileCount, err := hybridScanner.countLiveLogFiles(partition)
  152. if err == nil {
  153. dataSources.LiveLogFilesCount += liveLogFileCount
  154. }
  155. // Count broker unflushed messages for this partition
  156. if hybridScanner.brokerClient != nil {
  157. entries, err := hybridScanner.brokerClient.GetUnflushedMessages(ctx, hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition, 0)
  158. if err == nil {
  159. dataSources.BrokerUnflushedCount += int64(len(entries))
  160. if isDebugMode(ctx) {
  161. fmt.Printf(" Found %d unflushed broker messages\n", len(entries))
  162. }
  163. } else if isDebugMode(ctx) {
  164. fmt.Printf(" ERROR: Failed to get unflushed broker messages: %v\n", err)
  165. }
  166. }
  167. }
  168. }
  169. }
  170. dataSources.PartitionsCount = len(partitionPaths)
  171. if isDebugMode(ctx) {
  172. fmt.Printf("Data sources collected: %d partitions, %d parquet rows, %d live log rows, %d broker buffer rows\n",
  173. dataSources.PartitionsCount, dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount)
  174. }
  175. return dataSources, nil
  176. }
  177. // AggregationComputer handles the computation of aggregations using fast path
  178. type AggregationComputer struct {
  179. engine *SQLEngine
  180. }
  181. // NewAggregationComputer creates a new aggregation computer
  182. func NewAggregationComputer(engine *SQLEngine) *AggregationComputer {
  183. return &AggregationComputer{engine: engine}
  184. }
  185. // ComputeFastPathAggregations computes aggregations using parquet statistics and live log data
  186. func (comp *AggregationComputer) ComputeFastPathAggregations(
  187. ctx context.Context,
  188. aggregations []AggregationSpec,
  189. dataSources *TopicDataSources,
  190. partitions []string,
  191. ) ([]AggregationResult, error) {
  192. aggResults := make([]AggregationResult, len(aggregations))
  193. for i, spec := range aggregations {
  194. switch spec.Function {
  195. case FuncCOUNT:
  196. if spec.Column == "*" {
  197. aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
  198. } else {
  199. // For specific columns, we might need to account for NULLs in the future
  200. aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
  201. }
  202. case FuncMIN:
  203. globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions)
  204. if err != nil {
  205. return nil, AggregationError{
  206. Operation: spec.Function,
  207. Column: spec.Column,
  208. Cause: err,
  209. }
  210. }
  211. aggResults[i].Min = globalMin
  212. case FuncMAX:
  213. globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions)
  214. if err != nil {
  215. return nil, AggregationError{
  216. Operation: spec.Function,
  217. Column: spec.Column,
  218. Cause: err,
  219. }
  220. }
  221. aggResults[i].Max = globalMax
  222. default:
  223. return nil, OptimizationError{
  224. Strategy: "fast_path_aggregation",
  225. Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function),
  226. }
  227. }
  228. }
  229. return aggResults, nil
  230. }
  231. // computeGlobalMin computes the global minimum value across all data sources
  232. func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
  233. var globalMin interface{}
  234. var globalMinValue *schema_pb.Value
  235. hasParquetStats := false
  236. // Step 1: Get minimum from parquet statistics
  237. for _, fileStats := range dataSources.ParquetFiles {
  238. for _, fileStat := range fileStats {
  239. // Try case-insensitive column lookup
  240. var colStats *ParquetColumnStats
  241. var found bool
  242. // First try exact match
  243. if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
  244. colStats = stats
  245. found = true
  246. } else {
  247. // Try case-insensitive lookup
  248. for colName, stats := range fileStat.ColumnStats {
  249. if strings.EqualFold(colName, spec.Column) {
  250. colStats = stats
  251. found = true
  252. break
  253. }
  254. }
  255. }
  256. if found && colStats != nil && colStats.MinValue != nil {
  257. if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 {
  258. globalMinValue = colStats.MinValue
  259. extractedValue := comp.engine.extractRawValue(colStats.MinValue)
  260. if extractedValue != nil {
  261. globalMin = extractedValue
  262. hasParquetStats = true
  263. }
  264. }
  265. }
  266. }
  267. }
  268. // Step 2: Get minimum from live log data (only if no live logs or if we need to compare)
  269. if dataSources.LiveLogRowCount > 0 {
  270. for _, partition := range partitions {
  271. partitionParquetSources := make(map[string]bool)
  272. if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
  273. partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
  274. }
  275. liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
  276. if err != nil {
  277. continue // Skip partitions with errors
  278. }
  279. if liveLogMin != nil {
  280. if globalMin == nil {
  281. globalMin = liveLogMin
  282. } else {
  283. liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin)
  284. if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 {
  285. globalMin = liveLogMin
  286. globalMinValue = liveLogSchemaValue
  287. }
  288. }
  289. }
  290. }
  291. }
  292. // Step 3: Handle system columns if no regular data found
  293. if globalMin == nil && !hasParquetStats {
  294. globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles)
  295. }
  296. return globalMin, nil
  297. }
  298. // computeGlobalMax computes the global maximum value across all data sources
  299. func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
  300. var globalMax interface{}
  301. var globalMaxValue *schema_pb.Value
  302. hasParquetStats := false
  303. // Step 1: Get maximum from parquet statistics
  304. for _, fileStats := range dataSources.ParquetFiles {
  305. for _, fileStat := range fileStats {
  306. // Try case-insensitive column lookup
  307. var colStats *ParquetColumnStats
  308. var found bool
  309. // First try exact match
  310. if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
  311. colStats = stats
  312. found = true
  313. } else {
  314. // Try case-insensitive lookup
  315. for colName, stats := range fileStat.ColumnStats {
  316. if strings.EqualFold(colName, spec.Column) {
  317. colStats = stats
  318. found = true
  319. break
  320. }
  321. }
  322. }
  323. if found && colStats != nil && colStats.MaxValue != nil {
  324. if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 {
  325. globalMaxValue = colStats.MaxValue
  326. extractedValue := comp.engine.extractRawValue(colStats.MaxValue)
  327. if extractedValue != nil {
  328. globalMax = extractedValue
  329. hasParquetStats = true
  330. }
  331. }
  332. }
  333. }
  334. }
  335. // Step 2: Get maximum from live log data (only if live logs exist)
  336. if dataSources.LiveLogRowCount > 0 {
  337. for _, partition := range partitions {
  338. partitionParquetSources := make(map[string]bool)
  339. if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
  340. partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
  341. }
  342. _, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
  343. if err != nil {
  344. continue // Skip partitions with errors
  345. }
  346. if liveLogMax != nil {
  347. if globalMax == nil {
  348. globalMax = liveLogMax
  349. } else {
  350. liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax)
  351. if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 {
  352. globalMax = liveLogMax
  353. globalMaxValue = liveLogSchemaValue
  354. }
  355. }
  356. }
  357. }
  358. }
  359. // Step 3: Handle system columns if no regular data found
  360. if globalMax == nil && !hasParquetStats {
  361. globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles)
  362. }
  363. return globalMax, nil
  364. }
  365. // executeAggregationQuery handles SELECT queries with aggregation functions
  366. func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement) (*QueryResult, error) {
  367. return e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, nil)
  368. }
  369. // executeAggregationQueryWithPlan handles SELECT queries with aggregation functions and populates execution plan
  370. func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
  371. // Parse LIMIT and OFFSET for aggregation results (do this first)
  372. // Use -1 to distinguish "no LIMIT" from "LIMIT 0"
  373. limit := -1
  374. offset := 0
  375. if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
  376. if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal {
  377. if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil {
  378. if limit64 > int64(math.MaxInt) || limit64 < 0 {
  379. return nil, fmt.Errorf("LIMIT value %d is out of range", limit64)
  380. }
  381. // Safe conversion after bounds check
  382. limit = int(limit64)
  383. }
  384. }
  385. }
  386. if stmt.Limit != nil && stmt.Limit.Offset != nil {
  387. if offsetExpr, ok := stmt.Limit.Offset.(*SQLVal); ok && offsetExpr.Type == IntVal {
  388. if offset64, err := strconv.ParseInt(string(offsetExpr.Val), 10, 64); err == nil {
  389. if offset64 > int64(math.MaxInt) || offset64 < 0 {
  390. return nil, fmt.Errorf("OFFSET value %d is out of range", offset64)
  391. }
  392. // Safe conversion after bounds check
  393. offset = int(offset64)
  394. }
  395. }
  396. }
  397. // Parse WHERE clause for filtering
  398. var predicate func(*schema_pb.RecordValue) bool
  399. var err error
  400. if stmt.Where != nil {
  401. predicate, err = e.buildPredicate(stmt.Where.Expr)
  402. if err != nil {
  403. return &QueryResult{Error: err}, err
  404. }
  405. }
  406. // Extract time filters and validate that WHERE clause contains only time-based predicates
  407. startTimeNs, stopTimeNs := int64(0), int64(0)
  408. onlyTimePredicates := true
  409. if stmt.Where != nil {
  410. startTimeNs, stopTimeNs, onlyTimePredicates = e.extractTimeFiltersWithValidation(stmt.Where.Expr)
  411. }
  412. // FAST PATH WITH TIME-BASED OPTIMIZATION:
  413. // Allow fast path only for queries without WHERE clause or with time-only WHERE clauses
  414. // This prevents incorrect results when non-time predicates are present
  415. canAttemptFastPath := stmt.Where == nil || onlyTimePredicates
  416. if canAttemptFastPath {
  417. if isDebugMode(ctx) {
  418. if stmt.Where == nil {
  419. fmt.Printf("\nFast path optimization attempt (no WHERE clause)...\n")
  420. } else {
  421. fmt.Printf("\nFast path optimization attempt (time-only WHERE clause)...\n")
  422. }
  423. }
  424. fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan, startTimeNs, stopTimeNs, stmt)
  425. if canOptimize {
  426. if isDebugMode(ctx) {
  427. fmt.Printf("Fast path optimization succeeded!\n")
  428. }
  429. return fastResult, nil
  430. } else {
  431. if isDebugMode(ctx) {
  432. fmt.Printf("Fast path optimization failed, falling back to slow path\n")
  433. }
  434. }
  435. } else {
  436. if isDebugMode(ctx) {
  437. fmt.Printf("Fast path not applicable due to complex WHERE clause\n")
  438. }
  439. }
  440. // SLOW PATH: Fall back to full table scan
  441. if isDebugMode(ctx) {
  442. fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n")
  443. }
  444. // Extract columns needed for aggregations
  445. columnsNeeded := make(map[string]bool)
  446. for _, spec := range aggregations {
  447. if spec.Column != "*" {
  448. columnsNeeded[spec.Column] = true
  449. }
  450. }
  451. // Convert to slice
  452. var scanColumns []string
  453. if len(columnsNeeded) > 0 {
  454. scanColumns = make([]string, 0, len(columnsNeeded))
  455. for col := range columnsNeeded {
  456. scanColumns = append(scanColumns, col)
  457. }
  458. }
  459. // If no specific columns needed (COUNT(*) only), don't specify columns (scan all)
  460. // Build scan options for full table scan (aggregations need all data during scanning)
  461. hybridScanOptions := HybridScanOptions{
  462. StartTimeNs: startTimeNs,
  463. StopTimeNs: stopTimeNs,
  464. Limit: -1, // Use -1 to mean "no limit" - need all data for aggregation
  465. Offset: 0, // No offset during scanning - OFFSET applies to final results
  466. Predicate: predicate,
  467. Columns: scanColumns, // Include columns needed for aggregation functions
  468. }
  469. // DEBUG: Log scan options for aggregation
  470. debugHybridScanOptions(ctx, hybridScanOptions, "AGGREGATION")
  471. // Execute the hybrid scan to get all matching records
  472. var results []HybridScanResult
  473. if plan != nil {
  474. // EXPLAIN mode - capture broker buffer stats
  475. var stats *HybridScanStats
  476. results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
  477. if err != nil {
  478. return &QueryResult{Error: err}, err
  479. }
  480. // Populate plan with broker buffer information
  481. if stats != nil {
  482. plan.BrokerBufferQueried = stats.BrokerBufferQueried
  483. plan.BrokerBufferMessages = stats.BrokerBufferMessages
  484. plan.BufferStartIndex = stats.BufferStartIndex
  485. // Add broker_buffer to data sources if buffer was queried
  486. if stats.BrokerBufferQueried {
  487. // Check if broker_buffer is already in data sources
  488. hasBrokerBuffer := false
  489. for _, source := range plan.DataSources {
  490. if source == "broker_buffer" {
  491. hasBrokerBuffer = true
  492. break
  493. }
  494. }
  495. if !hasBrokerBuffer {
  496. plan.DataSources = append(plan.DataSources, "broker_buffer")
  497. }
  498. }
  499. }
  500. } else {
  501. // Normal mode - just get results
  502. results, err = hybridScanner.Scan(ctx, hybridScanOptions)
  503. if err != nil {
  504. return &QueryResult{Error: err}, err
  505. }
  506. }
  507. // DEBUG: Log scan results
  508. if isDebugMode(ctx) {
  509. fmt.Printf("AGGREGATION SCAN RESULTS: %d rows returned\n", len(results))
  510. }
  511. // Compute aggregations
  512. aggResults := e.computeAggregations(results, aggregations)
  513. // Build result set
  514. columns := make([]string, len(aggregations))
  515. row := make([]sqltypes.Value, len(aggregations))
  516. for i, spec := range aggregations {
  517. columns[i] = spec.Alias
  518. row[i] = e.formatAggregationResult(spec, aggResults[i])
  519. }
  520. // Apply OFFSET and LIMIT to aggregation results
  521. // Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
  522. rows := [][]sqltypes.Value{row}
  523. if offset > 0 || limit >= 0 {
  524. // Handle LIMIT 0 first
  525. if limit == 0 {
  526. rows = [][]sqltypes.Value{}
  527. } else {
  528. // Apply OFFSET first
  529. if offset > 0 {
  530. if offset >= len(rows) {
  531. rows = [][]sqltypes.Value{}
  532. } else {
  533. rows = rows[offset:]
  534. }
  535. }
  536. // Apply LIMIT after OFFSET (only if limit > 0)
  537. if limit > 0 && len(rows) > limit {
  538. rows = rows[:limit]
  539. }
  540. }
  541. }
  542. result := &QueryResult{
  543. Columns: columns,
  544. Rows: rows,
  545. }
  546. // Build execution tree for aggregation queries if plan is provided
  547. if plan != nil {
  548. // Populate detailed plan information for full scan (similar to fast path)
  549. e.populateFullScanPlanDetails(ctx, plan, hybridScanner, stmt)
  550. plan.RootNode = e.buildExecutionTree(plan, stmt)
  551. }
  552. return result, nil
  553. }
  554. // populateFullScanPlanDetails populates detailed plan information for full scan queries
  555. // This provides consistency with fast path execution plan details
  556. func (e *SQLEngine) populateFullScanPlanDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, stmt *SelectStatement) {
  557. // plan.Details is initialized at the start of the SELECT execution
  558. // Extract table information
  559. var database, tableName string
  560. if len(stmt.From) == 1 {
  561. if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
  562. if tableExpr, ok := table.Expr.(TableName); ok {
  563. tableName = tableExpr.Name.String()
  564. if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
  565. database = tableExpr.Qualifier.String()
  566. }
  567. }
  568. }
  569. }
  570. // Use current database if not specified
  571. if database == "" {
  572. database = e.catalog.currentDatabase
  573. if database == "" {
  574. database = "default"
  575. }
  576. }
  577. // Discover partitions and populate file details
  578. if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil {
  579. // Add partition paths to execution plan details
  580. plan.Details["partition_paths"] = partitions
  581. // Populate detailed file information using shared helper
  582. e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
  583. } else {
  584. // Record discovery error to plan for better diagnostics
  585. plan.Details["error_partition_discovery"] = discoverErr.Error()
  586. }
  587. }
  588. // tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
  589. // - Use parquet metadata for parquet files
  590. // - Count live log files for live data
  591. // - Combine both for accurate results per partition
  592. // Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
  593. func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) {
  594. return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil, 0, 0, nil)
  595. }
  596. // tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided
  597. // startTimeNs, stopTimeNs: optional time range filters for parquet file optimization (0 means no filtering)
  598. // stmt: SELECT statement for column statistics pruning optimization (can be nil)
  599. func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan, startTimeNs, stopTimeNs int64, stmt *SelectStatement) (*QueryResult, bool) {
  600. // Use the new modular components
  601. optimizer := NewFastPathOptimizer(e)
  602. computer := NewAggregationComputer(e)
  603. // Step 1: Determine strategy
  604. strategy := optimizer.DetermineStrategy(aggregations)
  605. if !strategy.CanUseFastPath {
  606. return nil, false
  607. }
  608. // Step 2: Collect data sources with time filtering for parquet file optimization
  609. dataSources, err := optimizer.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, startTimeNs, stopTimeNs)
  610. if err != nil {
  611. return nil, false
  612. }
  613. // Build partition list for aggregation computer
  614. // Note: discoverTopicPartitions always returns absolute paths
  615. partitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
  616. if err != nil {
  617. return nil, false
  618. }
  619. // Debug: Show the hybrid optimization results (only in explain mode)
  620. if isDebugMode(ctx) && (dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0) {
  621. partitionsWithLiveLogs := 0
  622. if dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0 {
  623. partitionsWithLiveLogs = 1 // Simplified for now
  624. }
  625. fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows + %d broker buffer rows from %d partitions\n",
  626. dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount, partitionsWithLiveLogs)
  627. }
  628. // Step 3: Compute aggregations using fast path
  629. aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
  630. if err != nil {
  631. return nil, false
  632. }
  633. // Step 3.5: Validate fast path results (safety check)
  634. // For simple COUNT(*) queries, ensure we got a reasonable result
  635. if len(aggregations) == 1 && aggregations[0].Function == FuncCOUNT && aggregations[0].Column == "*" {
  636. totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
  637. countResult := aggResults[0].Count
  638. if isDebugMode(ctx) {
  639. fmt.Printf("Validating fast path: COUNT=%d, Sources=%d\n", countResult, totalRows)
  640. }
  641. if totalRows == 0 && countResult > 0 {
  642. // Fast path found data but data sources show 0 - this suggests a bug
  643. if isDebugMode(ctx) {
  644. fmt.Printf("Fast path validation failed: COUNT=%d but sources=0\n", countResult)
  645. }
  646. return nil, false
  647. }
  648. if totalRows > 0 && countResult == 0 {
  649. // Data sources show data but COUNT is 0 - this also suggests a bug
  650. if isDebugMode(ctx) {
  651. fmt.Printf("Fast path validation failed: sources=%d but COUNT=0\n", totalRows)
  652. }
  653. return nil, false
  654. }
  655. if countResult != totalRows {
  656. // Counts don't match - this suggests inconsistent logic
  657. if isDebugMode(ctx) {
  658. fmt.Printf("Fast path validation failed: COUNT=%d != sources=%d\n", countResult, totalRows)
  659. }
  660. return nil, false
  661. }
  662. if isDebugMode(ctx) {
  663. fmt.Printf("Fast path validation passed: COUNT=%d\n", countResult)
  664. }
  665. }
  666. // Step 4: Populate execution plan if provided (for EXPLAIN queries)
  667. if plan != nil {
  668. strategy := optimizer.DetermineStrategy(aggregations)
  669. builder := &ExecutionPlanBuilder{}
  670. // Create a minimal SELECT statement for the plan builder (avoid nil pointer)
  671. stmt := &SelectStatement{}
  672. // Build aggregation plan with fast path strategy
  673. aggPlan := builder.BuildAggregationPlan(stmt, aggregations, strategy, dataSources)
  674. // Copy relevant fields to the main plan
  675. plan.ExecutionStrategy = aggPlan.ExecutionStrategy
  676. plan.DataSources = aggPlan.DataSources
  677. plan.OptimizationsUsed = aggPlan.OptimizationsUsed
  678. plan.PartitionsScanned = aggPlan.PartitionsScanned
  679. plan.ParquetFilesScanned = aggPlan.ParquetFilesScanned
  680. plan.LiveLogFilesScanned = aggPlan.LiveLogFilesScanned
  681. plan.TotalRowsProcessed = aggPlan.TotalRowsProcessed
  682. plan.Aggregations = aggPlan.Aggregations
  683. // Indicate broker buffer participation for EXPLAIN tree rendering
  684. if dataSources.BrokerUnflushedCount > 0 {
  685. plan.BrokerBufferQueried = true
  686. plan.BrokerBufferMessages = int(dataSources.BrokerUnflushedCount)
  687. }
  688. // Merge details while preserving existing ones
  689. for key, value := range aggPlan.Details {
  690. plan.Details[key] = value
  691. }
  692. // Add file path information from the data collection
  693. plan.Details["partition_paths"] = partitions
  694. // Populate detailed file information using shared helper, including time filters for pruning
  695. plan.Details[PlanDetailStartTimeNs] = startTimeNs
  696. plan.Details[PlanDetailStopTimeNs] = stopTimeNs
  697. e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
  698. // Update counts to match discovered live log files
  699. if liveLogFiles, ok := plan.Details["live_log_files"].([]string); ok {
  700. dataSources.LiveLogFilesCount = len(liveLogFiles)
  701. plan.LiveLogFilesScanned = len(liveLogFiles)
  702. }
  703. // Ensure PartitionsScanned is set so Statistics section appears
  704. if plan.PartitionsScanned == 0 && len(partitions) > 0 {
  705. plan.PartitionsScanned = len(partitions)
  706. }
  707. if isDebugMode(ctx) {
  708. fmt.Printf("Populated execution plan with fast path strategy\n")
  709. }
  710. }
  711. // Step 5: Build final query result
  712. columns := make([]string, len(aggregations))
  713. row := make([]sqltypes.Value, len(aggregations))
  714. for i, spec := range aggregations {
  715. columns[i] = spec.Alias
  716. row[i] = e.formatAggregationResult(spec, aggResults[i])
  717. }
  718. result := &QueryResult{
  719. Columns: columns,
  720. Rows: [][]sqltypes.Value{row},
  721. }
  722. return result, true
  723. }
  724. // computeAggregations computes aggregation results from a full table scan
  725. func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult {
  726. aggResults := make([]AggregationResult, len(aggregations))
  727. for i, spec := range aggregations {
  728. switch spec.Function {
  729. case FuncCOUNT:
  730. if spec.Column == "*" {
  731. aggResults[i].Count = int64(len(results))
  732. } else {
  733. count := int64(0)
  734. for _, result := range results {
  735. if value := e.findColumnValue(result, spec.Column); value != nil && !e.isNullValue(value) {
  736. count++
  737. }
  738. }
  739. aggResults[i].Count = count
  740. }
  741. case FuncSUM:
  742. sum := float64(0)
  743. for _, result := range results {
  744. if value := e.findColumnValue(result, spec.Column); value != nil {
  745. if numValue := e.convertToNumber(value); numValue != nil {
  746. sum += *numValue
  747. }
  748. }
  749. }
  750. aggResults[i].Sum = sum
  751. case FuncAVG:
  752. sum := float64(0)
  753. count := int64(0)
  754. for _, result := range results {
  755. if value := e.findColumnValue(result, spec.Column); value != nil {
  756. if numValue := e.convertToNumber(value); numValue != nil {
  757. sum += *numValue
  758. count++
  759. }
  760. }
  761. }
  762. if count > 0 {
  763. aggResults[i].Sum = sum / float64(count) // Store average in Sum field
  764. aggResults[i].Count = count
  765. }
  766. case FuncMIN:
  767. var min interface{}
  768. var minValue *schema_pb.Value
  769. for _, result := range results {
  770. if value := e.findColumnValue(result, spec.Column); value != nil {
  771. if minValue == nil || e.compareValues(value, minValue) < 0 {
  772. minValue = value
  773. min = e.extractRawValue(value)
  774. }
  775. }
  776. }
  777. aggResults[i].Min = min
  778. case FuncMAX:
  779. var max interface{}
  780. var maxValue *schema_pb.Value
  781. for _, result := range results {
  782. if value := e.findColumnValue(result, spec.Column); value != nil {
  783. if maxValue == nil || e.compareValues(value, maxValue) > 0 {
  784. maxValue = value
  785. max = e.extractRawValue(value)
  786. }
  787. }
  788. }
  789. aggResults[i].Max = max
  790. }
  791. }
  792. return aggResults
  793. }
  794. // canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats
  795. func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool {
  796. switch spec.Function {
  797. case FuncCOUNT:
  798. return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
  799. case FuncMIN, FuncMAX:
  800. return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
  801. case FuncSUM, FuncAVG:
  802. // These require scanning actual values, not just min/max
  803. return false
  804. default:
  805. return false
  806. }
  807. }
  808. // debugHybridScanOptions logs the exact scan options being used
  809. func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, queryType string) {
  810. if isDebugMode(ctx) {
  811. fmt.Printf("\n=== HYBRID SCAN OPTIONS DEBUG (%s) ===\n", queryType)
  812. fmt.Printf("StartTimeNs: %d\n", options.StartTimeNs)
  813. fmt.Printf("StopTimeNs: %d\n", options.StopTimeNs)
  814. fmt.Printf("Limit: %d\n", options.Limit)
  815. fmt.Printf("Offset: %d\n", options.Offset)
  816. fmt.Printf("Predicate: %v\n", options.Predicate != nil)
  817. fmt.Printf("Columns: %v\n", options.Columns)
  818. fmt.Printf("==========================================\n")
  819. }
  820. }