fast_path_fix_test.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package engine
  2. import (
  3. "context"
  4. "testing"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. "github.com/stretchr/testify/assert"
  7. )
  8. // TestFastPathCountFixRealistic tests the specific scenario mentioned in the bug report:
  9. // Fast path returning 0 for COUNT(*) when slow path returns 1803
  10. func TestFastPathCountFixRealistic(t *testing.T) {
  11. engine := NewMockSQLEngine()
  12. // Set up debug mode to see our new logging
  13. ctx := context.WithValue(context.Background(), "debug", true)
  14. // Create realistic data sources that mimic a scenario with 1803 rows
  15. dataSources := &TopicDataSources{
  16. ParquetFiles: map[string][]*ParquetFileStats{
  17. "/topics/test/large-topic/0000-1023": {
  18. {
  19. RowCount: 800,
  20. ColumnStats: map[string]*ParquetColumnStats{
  21. "id": {
  22. ColumnName: "id",
  23. MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1}},
  24. MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 800}},
  25. NullCount: 0,
  26. RowCount: 800,
  27. },
  28. },
  29. },
  30. {
  31. RowCount: 500,
  32. ColumnStats: map[string]*ParquetColumnStats{
  33. "id": {
  34. ColumnName: "id",
  35. MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 801}},
  36. MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1300}},
  37. NullCount: 0,
  38. RowCount: 500,
  39. },
  40. },
  41. },
  42. },
  43. "/topics/test/large-topic/1024-2047": {
  44. {
  45. RowCount: 300,
  46. ColumnStats: map[string]*ParquetColumnStats{
  47. "id": {
  48. ColumnName: "id",
  49. MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1301}},
  50. MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1600}},
  51. NullCount: 0,
  52. RowCount: 300,
  53. },
  54. },
  55. },
  56. },
  57. },
  58. ParquetRowCount: 1600, // 800 + 500 + 300
  59. LiveLogRowCount: 203, // Additional live log data
  60. PartitionsCount: 2,
  61. LiveLogFilesCount: 15,
  62. }
  63. partitions := []string{
  64. "/topics/test/large-topic/0000-1023",
  65. "/topics/test/large-topic/1024-2047",
  66. }
  67. t.Run("COUNT(*) should return correct total (1803)", func(t *testing.T) {
  68. computer := NewAggregationComputer(engine.SQLEngine)
  69. aggregations := []AggregationSpec{
  70. {Function: FuncCOUNT, Column: "*", Alias: "COUNT(*)"},
  71. }
  72. results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
  73. assert.NoError(t, err, "Fast path aggregation should not error")
  74. assert.Len(t, results, 1, "Should return one result")
  75. // This is the key test - before our fix, this was returning 0
  76. expectedCount := int64(1803) // 1600 (parquet) + 203 (live log)
  77. actualCount := results[0].Count
  78. assert.Equal(t, expectedCount, actualCount,
  79. "COUNT(*) should return %d (1600 parquet + 203 live log), but got %d",
  80. expectedCount, actualCount)
  81. })
  82. t.Run("MIN/MAX should work with multiple partitions", func(t *testing.T) {
  83. computer := NewAggregationComputer(engine.SQLEngine)
  84. aggregations := []AggregationSpec{
  85. {Function: FuncMIN, Column: "id", Alias: "MIN(id)"},
  86. {Function: FuncMAX, Column: "id", Alias: "MAX(id)"},
  87. }
  88. results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
  89. assert.NoError(t, err, "Fast path aggregation should not error")
  90. assert.Len(t, results, 2, "Should return two results")
  91. // MIN should be the lowest across all parquet files
  92. assert.Equal(t, int64(1), results[0].Min, "MIN should be 1")
  93. // MAX should be the highest across all parquet files
  94. assert.Equal(t, int64(1600), results[1].Max, "MAX should be 1600")
  95. })
  96. }
  97. // TestFastPathDataSourceDiscoveryLogging tests that our debug logging works correctly
  98. func TestFastPathDataSourceDiscoveryLogging(t *testing.T) {
  99. // This test verifies that our enhanced data source collection structure is correct
  100. t.Run("DataSources structure validation", func(t *testing.T) {
  101. // Test the TopicDataSources structure initialization
  102. dataSources := &TopicDataSources{
  103. ParquetFiles: make(map[string][]*ParquetFileStats),
  104. ParquetRowCount: 0,
  105. LiveLogRowCount: 0,
  106. LiveLogFilesCount: 0,
  107. PartitionsCount: 0,
  108. }
  109. assert.NotNil(t, dataSources, "Data sources should not be nil")
  110. assert.NotNil(t, dataSources.ParquetFiles, "ParquetFiles map should be initialized")
  111. assert.GreaterOrEqual(t, dataSources.PartitionsCount, 0, "PartitionsCount should be non-negative")
  112. assert.GreaterOrEqual(t, dataSources.ParquetRowCount, int64(0), "ParquetRowCount should be non-negative")
  113. assert.GreaterOrEqual(t, dataSources.LiveLogRowCount, int64(0), "LiveLogRowCount should be non-negative")
  114. })
  115. }
  116. // TestFastPathValidationLogic tests the enhanced validation we added
  117. func TestFastPathValidationLogic(t *testing.T) {
  118. t.Run("Validation catches data source vs computation mismatch", func(t *testing.T) {
  119. // Create a scenario where data sources and computation might be inconsistent
  120. dataSources := &TopicDataSources{
  121. ParquetFiles: make(map[string][]*ParquetFileStats),
  122. ParquetRowCount: 1000, // Data sources say 1000 rows
  123. LiveLogRowCount: 0,
  124. PartitionsCount: 1,
  125. }
  126. // But aggregation result says different count (simulating the original bug)
  127. aggResults := []AggregationResult{
  128. {Count: 0}, // Bug: returns 0 when data sources show 1000
  129. }
  130. // This simulates the validation logic from tryFastParquetAggregation
  131. totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount
  132. countResult := aggResults[0].Count
  133. // Our validation should catch this mismatch
  134. assert.NotEqual(t, totalRows, countResult,
  135. "This test simulates the bug: data sources show %d but COUNT returns %d",
  136. totalRows, countResult)
  137. // In the real code, this would trigger a fallback to slow path
  138. validationPassed := (countResult == totalRows)
  139. assert.False(t, validationPassed, "Validation should fail for inconsistent data")
  140. })
  141. t.Run("Validation passes for consistent data", func(t *testing.T) {
  142. // Create a scenario where everything is consistent
  143. dataSources := &TopicDataSources{
  144. ParquetFiles: make(map[string][]*ParquetFileStats),
  145. ParquetRowCount: 1000,
  146. LiveLogRowCount: 803,
  147. PartitionsCount: 1,
  148. }
  149. // Aggregation result matches data sources
  150. aggResults := []AggregationResult{
  151. {Count: 1803}, // Correct: matches 1000 + 803
  152. }
  153. totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount
  154. countResult := aggResults[0].Count
  155. // Our validation should pass this
  156. assert.Equal(t, totalRows, countResult,
  157. "Validation should pass when data sources (%d) match COUNT result (%d)",
  158. totalRows, countResult)
  159. validationPassed := (countResult == totalRows)
  160. assert.True(t, validationPassed, "Validation should pass for consistent data")
  161. })
  162. }