engine_test.go 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392
  1. package engine
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "errors"
  6. "testing"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/stretchr/testify/mock"
  12. "google.golang.org/protobuf/proto"
  13. )
  14. // Mock implementations for testing
  15. type MockHybridMessageScanner struct {
  16. mock.Mock
  17. topic topic.Topic
  18. }
  19. func (m *MockHybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
  20. args := m.Called(partitionPath)
  21. return args.Get(0).([]*ParquetFileStats), args.Error(1)
  22. }
  23. type MockSQLEngine struct {
  24. *SQLEngine
  25. mockPartitions map[string][]string
  26. mockParquetSourceFiles map[string]map[string]bool
  27. mockLiveLogRowCounts map[string]int64
  28. mockColumnStats map[string]map[string]*ParquetColumnStats
  29. }
  30. func NewMockSQLEngine() *MockSQLEngine {
  31. return &MockSQLEngine{
  32. SQLEngine: &SQLEngine{
  33. catalog: &SchemaCatalog{
  34. databases: make(map[string]*DatabaseInfo),
  35. currentDatabase: "test",
  36. },
  37. },
  38. mockPartitions: make(map[string][]string),
  39. mockParquetSourceFiles: make(map[string]map[string]bool),
  40. mockLiveLogRowCounts: make(map[string]int64),
  41. mockColumnStats: make(map[string]map[string]*ParquetColumnStats),
  42. }
  43. }
  44. func (m *MockSQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) {
  45. key := namespace + "." + topicName
  46. if partitions, exists := m.mockPartitions[key]; exists {
  47. return partitions, nil
  48. }
  49. return []string{"partition-1", "partition-2"}, nil
  50. }
  51. func (m *MockSQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool {
  52. if len(fileStats) == 0 {
  53. return make(map[string]bool)
  54. }
  55. return map[string]bool{"converted-log-1": true}
  56. }
  57. func (m *MockSQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partition string, parquetSources map[string]bool) (int64, error) {
  58. if count, exists := m.mockLiveLogRowCounts[partition]; exists {
  59. return count, nil
  60. }
  61. return 25, nil
  62. }
  63. func (m *MockSQLEngine) computeLiveLogMinMax(partition, column string, parquetSources map[string]bool) (interface{}, interface{}, error) {
  64. switch column {
  65. case "id":
  66. return int64(1), int64(50), nil
  67. case "value":
  68. return 10.5, 99.9, nil
  69. default:
  70. return nil, nil, nil
  71. }
  72. }
  73. func (m *MockSQLEngine) getSystemColumnGlobalMin(column string, allFileStats map[string][]*ParquetFileStats) interface{} {
  74. return int64(1000000000)
  75. }
  76. func (m *MockSQLEngine) getSystemColumnGlobalMax(column string, allFileStats map[string][]*ParquetFileStats) interface{} {
  77. return int64(2000000000)
  78. }
  79. func createMockColumnStats(column string, minVal, maxVal interface{}) *ParquetColumnStats {
  80. return &ParquetColumnStats{
  81. ColumnName: column,
  82. MinValue: convertToSchemaValue(minVal),
  83. MaxValue: convertToSchemaValue(maxVal),
  84. NullCount: 0,
  85. }
  86. }
  87. func convertToSchemaValue(val interface{}) *schema_pb.Value {
  88. switch v := val.(type) {
  89. case int64:
  90. return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v}}
  91. case float64:
  92. return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}}
  93. case string:
  94. return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v}}
  95. }
  96. return nil
  97. }
  98. // Test FastPathOptimizer
  99. func TestFastPathOptimizer_DetermineStrategy(t *testing.T) {
  100. engine := NewMockSQLEngine()
  101. optimizer := NewFastPathOptimizer(engine.SQLEngine)
  102. tests := []struct {
  103. name string
  104. aggregations []AggregationSpec
  105. expected AggregationStrategy
  106. }{
  107. {
  108. name: "Supported aggregations",
  109. aggregations: []AggregationSpec{
  110. {Function: FuncCOUNT, Column: "*"},
  111. {Function: FuncMAX, Column: "id"},
  112. {Function: FuncMIN, Column: "value"},
  113. },
  114. expected: AggregationStrategy{
  115. CanUseFastPath: true,
  116. Reason: "all_aggregations_supported",
  117. UnsupportedSpecs: []AggregationSpec{},
  118. },
  119. },
  120. {
  121. name: "Unsupported aggregation",
  122. aggregations: []AggregationSpec{
  123. {Function: FuncCOUNT, Column: "*"},
  124. {Function: FuncAVG, Column: "value"}, // Not supported
  125. },
  126. expected: AggregationStrategy{
  127. CanUseFastPath: false,
  128. Reason: "unsupported_aggregation_functions",
  129. },
  130. },
  131. {
  132. name: "Empty aggregations",
  133. aggregations: []AggregationSpec{},
  134. expected: AggregationStrategy{
  135. CanUseFastPath: true,
  136. Reason: "all_aggregations_supported",
  137. UnsupportedSpecs: []AggregationSpec{},
  138. },
  139. },
  140. }
  141. for _, tt := range tests {
  142. t.Run(tt.name, func(t *testing.T) {
  143. strategy := optimizer.DetermineStrategy(tt.aggregations)
  144. assert.Equal(t, tt.expected.CanUseFastPath, strategy.CanUseFastPath)
  145. assert.Equal(t, tt.expected.Reason, strategy.Reason)
  146. if !tt.expected.CanUseFastPath {
  147. assert.NotEmpty(t, strategy.UnsupportedSpecs)
  148. }
  149. })
  150. }
  151. }
  152. // Test AggregationComputer
  153. func TestAggregationComputer_ComputeFastPathAggregations(t *testing.T) {
  154. engine := NewMockSQLEngine()
  155. computer := NewAggregationComputer(engine.SQLEngine)
  156. dataSources := &TopicDataSources{
  157. ParquetFiles: map[string][]*ParquetFileStats{
  158. "/topics/test/topic1/partition-1": {
  159. {
  160. RowCount: 30,
  161. ColumnStats: map[string]*ParquetColumnStats{
  162. "id": createMockColumnStats("id", int64(10), int64(40)),
  163. },
  164. },
  165. },
  166. },
  167. ParquetRowCount: 30,
  168. LiveLogRowCount: 25,
  169. PartitionsCount: 1,
  170. }
  171. partitions := []string{"/topics/test/topic1/partition-1"}
  172. tests := []struct {
  173. name string
  174. aggregations []AggregationSpec
  175. validate func(t *testing.T, results []AggregationResult)
  176. }{
  177. {
  178. name: "COUNT aggregation",
  179. aggregations: []AggregationSpec{
  180. {Function: FuncCOUNT, Column: "*"},
  181. },
  182. validate: func(t *testing.T, results []AggregationResult) {
  183. assert.Len(t, results, 1)
  184. assert.Equal(t, int64(55), results[0].Count) // 30 + 25
  185. },
  186. },
  187. {
  188. name: "MAX aggregation",
  189. aggregations: []AggregationSpec{
  190. {Function: FuncMAX, Column: "id"},
  191. },
  192. validate: func(t *testing.T, results []AggregationResult) {
  193. assert.Len(t, results, 1)
  194. // Should be max of parquet stats (40) - mock doesn't combine with live log
  195. assert.Equal(t, int64(40), results[0].Max)
  196. },
  197. },
  198. {
  199. name: "MIN aggregation",
  200. aggregations: []AggregationSpec{
  201. {Function: FuncMIN, Column: "id"},
  202. },
  203. validate: func(t *testing.T, results []AggregationResult) {
  204. assert.Len(t, results, 1)
  205. // Should be min of parquet stats (10) - mock doesn't combine with live log
  206. assert.Equal(t, int64(10), results[0].Min)
  207. },
  208. },
  209. }
  210. for _, tt := range tests {
  211. t.Run(tt.name, func(t *testing.T) {
  212. ctx := context.Background()
  213. results, err := computer.ComputeFastPathAggregations(ctx, tt.aggregations, dataSources, partitions)
  214. assert.NoError(t, err)
  215. tt.validate(t, results)
  216. })
  217. }
  218. }
  219. // Test case-insensitive column lookup and null handling for MIN/MAX aggregations
  220. func TestAggregationComputer_MinMaxEdgeCases(t *testing.T) {
  221. engine := NewMockSQLEngine()
  222. computer := NewAggregationComputer(engine.SQLEngine)
  223. tests := []struct {
  224. name string
  225. dataSources *TopicDataSources
  226. aggregations []AggregationSpec
  227. validate func(t *testing.T, results []AggregationResult, err error)
  228. }{
  229. {
  230. name: "Case insensitive column lookup",
  231. dataSources: &TopicDataSources{
  232. ParquetFiles: map[string][]*ParquetFileStats{
  233. "/topics/test/partition-1": {
  234. {
  235. RowCount: 50,
  236. ColumnStats: map[string]*ParquetColumnStats{
  237. "ID": createMockColumnStats("ID", int64(5), int64(95)), // Uppercase column name
  238. },
  239. },
  240. },
  241. },
  242. ParquetRowCount: 50,
  243. LiveLogRowCount: 0,
  244. PartitionsCount: 1,
  245. },
  246. aggregations: []AggregationSpec{
  247. {Function: FuncMIN, Column: "id"}, // lowercase column name
  248. {Function: FuncMAX, Column: "id"},
  249. },
  250. validate: func(t *testing.T, results []AggregationResult, err error) {
  251. assert.NoError(t, err)
  252. assert.Len(t, results, 2)
  253. assert.Equal(t, int64(5), results[0].Min, "MIN should work with case-insensitive lookup")
  254. assert.Equal(t, int64(95), results[1].Max, "MAX should work with case-insensitive lookup")
  255. },
  256. },
  257. {
  258. name: "Null column stats handling",
  259. dataSources: &TopicDataSources{
  260. ParquetFiles: map[string][]*ParquetFileStats{
  261. "/topics/test/partition-1": {
  262. {
  263. RowCount: 50,
  264. ColumnStats: map[string]*ParquetColumnStats{
  265. "id": {
  266. ColumnName: "id",
  267. MinValue: nil, // Null min value
  268. MaxValue: nil, // Null max value
  269. NullCount: 50,
  270. RowCount: 50,
  271. },
  272. },
  273. },
  274. },
  275. },
  276. ParquetRowCount: 50,
  277. LiveLogRowCount: 0,
  278. PartitionsCount: 1,
  279. },
  280. aggregations: []AggregationSpec{
  281. {Function: FuncMIN, Column: "id"},
  282. {Function: FuncMAX, Column: "id"},
  283. },
  284. validate: func(t *testing.T, results []AggregationResult, err error) {
  285. assert.NoError(t, err)
  286. assert.Len(t, results, 2)
  287. // When stats are null, should fall back to system column or return nil
  288. // This tests that we don't crash on null stats
  289. },
  290. },
  291. {
  292. name: "Mixed data types - string column",
  293. dataSources: &TopicDataSources{
  294. ParquetFiles: map[string][]*ParquetFileStats{
  295. "/topics/test/partition-1": {
  296. {
  297. RowCount: 30,
  298. ColumnStats: map[string]*ParquetColumnStats{
  299. "name": createMockColumnStats("name", "Alice", "Zoe"),
  300. },
  301. },
  302. },
  303. },
  304. ParquetRowCount: 30,
  305. LiveLogRowCount: 0,
  306. PartitionsCount: 1,
  307. },
  308. aggregations: []AggregationSpec{
  309. {Function: FuncMIN, Column: "name"},
  310. {Function: FuncMAX, Column: "name"},
  311. },
  312. validate: func(t *testing.T, results []AggregationResult, err error) {
  313. assert.NoError(t, err)
  314. assert.Len(t, results, 2)
  315. assert.Equal(t, "Alice", results[0].Min)
  316. assert.Equal(t, "Zoe", results[1].Max)
  317. },
  318. },
  319. {
  320. name: "Mixed data types - float column",
  321. dataSources: &TopicDataSources{
  322. ParquetFiles: map[string][]*ParquetFileStats{
  323. "/topics/test/partition-1": {
  324. {
  325. RowCount: 25,
  326. ColumnStats: map[string]*ParquetColumnStats{
  327. "price": createMockColumnStats("price", float64(19.99), float64(299.50)),
  328. },
  329. },
  330. },
  331. },
  332. ParquetRowCount: 25,
  333. LiveLogRowCount: 0,
  334. PartitionsCount: 1,
  335. },
  336. aggregations: []AggregationSpec{
  337. {Function: FuncMIN, Column: "price"},
  338. {Function: FuncMAX, Column: "price"},
  339. },
  340. validate: func(t *testing.T, results []AggregationResult, err error) {
  341. assert.NoError(t, err)
  342. assert.Len(t, results, 2)
  343. assert.Equal(t, float64(19.99), results[0].Min)
  344. assert.Equal(t, float64(299.50), results[1].Max)
  345. },
  346. },
  347. {
  348. name: "Column not found in parquet stats",
  349. dataSources: &TopicDataSources{
  350. ParquetFiles: map[string][]*ParquetFileStats{
  351. "/topics/test/partition-1": {
  352. {
  353. RowCount: 20,
  354. ColumnStats: map[string]*ParquetColumnStats{
  355. "id": createMockColumnStats("id", int64(1), int64(100)),
  356. // Note: "nonexistent_column" is not in stats
  357. },
  358. },
  359. },
  360. },
  361. ParquetRowCount: 20,
  362. LiveLogRowCount: 10, // Has live logs to fall back to
  363. PartitionsCount: 1,
  364. },
  365. aggregations: []AggregationSpec{
  366. {Function: FuncMIN, Column: "nonexistent_column"},
  367. {Function: FuncMAX, Column: "nonexistent_column"},
  368. },
  369. validate: func(t *testing.T, results []AggregationResult, err error) {
  370. assert.NoError(t, err)
  371. assert.Len(t, results, 2)
  372. // Should fall back to live log processing or return nil
  373. // The key is that it shouldn't crash
  374. },
  375. },
  376. {
  377. name: "Multiple parquet files with different ranges",
  378. dataSources: &TopicDataSources{
  379. ParquetFiles: map[string][]*ParquetFileStats{
  380. "/topics/test/partition-1": {
  381. {
  382. RowCount: 30,
  383. ColumnStats: map[string]*ParquetColumnStats{
  384. "score": createMockColumnStats("score", int64(10), int64(50)),
  385. },
  386. },
  387. {
  388. RowCount: 40,
  389. ColumnStats: map[string]*ParquetColumnStats{
  390. "score": createMockColumnStats("score", int64(5), int64(75)), // Lower min, higher max
  391. },
  392. },
  393. },
  394. },
  395. ParquetRowCount: 70,
  396. LiveLogRowCount: 0,
  397. PartitionsCount: 1,
  398. },
  399. aggregations: []AggregationSpec{
  400. {Function: FuncMIN, Column: "score"},
  401. {Function: FuncMAX, Column: "score"},
  402. },
  403. validate: func(t *testing.T, results []AggregationResult, err error) {
  404. assert.NoError(t, err)
  405. assert.Len(t, results, 2)
  406. assert.Equal(t, int64(5), results[0].Min, "Should find global minimum across all files")
  407. assert.Equal(t, int64(75), results[1].Max, "Should find global maximum across all files")
  408. },
  409. },
  410. }
  411. partitions := []string{"/topics/test/partition-1"}
  412. for _, tt := range tests {
  413. t.Run(tt.name, func(t *testing.T) {
  414. ctx := context.Background()
  415. results, err := computer.ComputeFastPathAggregations(ctx, tt.aggregations, tt.dataSources, partitions)
  416. tt.validate(t, results, err)
  417. })
  418. }
  419. }
  420. // Test the specific bug where MIN/MAX was returning empty values
  421. func TestAggregationComputer_MinMaxEmptyValuesBugFix(t *testing.T) {
  422. engine := NewMockSQLEngine()
  423. computer := NewAggregationComputer(engine.SQLEngine)
  424. // This test specifically addresses the bug where MIN/MAX returned empty
  425. // due to improper null checking and extraction logic
  426. dataSources := &TopicDataSources{
  427. ParquetFiles: map[string][]*ParquetFileStats{
  428. "/topics/test/test-topic/partition1": {
  429. {
  430. RowCount: 100,
  431. ColumnStats: map[string]*ParquetColumnStats{
  432. "id": {
  433. ColumnName: "id",
  434. MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 0}}, // Min should be 0
  435. MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 99}}, // Max should be 99
  436. NullCount: 0,
  437. RowCount: 100,
  438. },
  439. },
  440. },
  441. },
  442. },
  443. ParquetRowCount: 100,
  444. LiveLogRowCount: 0, // No live logs, pure parquet stats
  445. PartitionsCount: 1,
  446. }
  447. partitions := []string{"/topics/test/test-topic/partition1"}
  448. tests := []struct {
  449. name string
  450. aggregSpec AggregationSpec
  451. expected interface{}
  452. }{
  453. {
  454. name: "MIN should return 0 not empty",
  455. aggregSpec: AggregationSpec{Function: FuncMIN, Column: "id"},
  456. expected: int32(0), // Should extract the actual minimum value
  457. },
  458. {
  459. name: "MAX should return 99 not empty",
  460. aggregSpec: AggregationSpec{Function: FuncMAX, Column: "id"},
  461. expected: int32(99), // Should extract the actual maximum value
  462. },
  463. }
  464. for _, tt := range tests {
  465. t.Run(tt.name, func(t *testing.T) {
  466. ctx := context.Background()
  467. results, err := computer.ComputeFastPathAggregations(ctx, []AggregationSpec{tt.aggregSpec}, dataSources, partitions)
  468. assert.NoError(t, err)
  469. assert.Len(t, results, 1)
  470. // Verify the result is not nil/empty
  471. if tt.aggregSpec.Function == FuncMIN {
  472. assert.NotNil(t, results[0].Min, "MIN result should not be nil")
  473. assert.Equal(t, tt.expected, results[0].Min)
  474. } else if tt.aggregSpec.Function == FuncMAX {
  475. assert.NotNil(t, results[0].Max, "MAX result should not be nil")
  476. assert.Equal(t, tt.expected, results[0].Max)
  477. }
  478. })
  479. }
  480. }
  481. // Test the formatAggregationResult function with MIN/MAX edge cases
  482. func TestSQLEngine_FormatAggregationResult_MinMax(t *testing.T) {
  483. engine := NewTestSQLEngine()
  484. tests := []struct {
  485. name string
  486. spec AggregationSpec
  487. result AggregationResult
  488. expected string
  489. }{
  490. {
  491. name: "MIN with zero value should not be empty",
  492. spec: AggregationSpec{Function: FuncMIN, Column: "id"},
  493. result: AggregationResult{Min: int32(0)},
  494. expected: "0",
  495. },
  496. {
  497. name: "MAX with large value",
  498. spec: AggregationSpec{Function: FuncMAX, Column: "id"},
  499. result: AggregationResult{Max: int32(99)},
  500. expected: "99",
  501. },
  502. {
  503. name: "MIN with negative value",
  504. spec: AggregationSpec{Function: FuncMIN, Column: "score"},
  505. result: AggregationResult{Min: int64(-50)},
  506. expected: "-50",
  507. },
  508. {
  509. name: "MAX with float value",
  510. spec: AggregationSpec{Function: FuncMAX, Column: "price"},
  511. result: AggregationResult{Max: float64(299.99)},
  512. expected: "299.99",
  513. },
  514. {
  515. name: "MIN with string value",
  516. spec: AggregationSpec{Function: FuncMIN, Column: "name"},
  517. result: AggregationResult{Min: "Alice"},
  518. expected: "Alice",
  519. },
  520. {
  521. name: "MIN with nil should return NULL",
  522. spec: AggregationSpec{Function: FuncMIN, Column: "missing"},
  523. result: AggregationResult{Min: nil},
  524. expected: "", // NULL values display as empty
  525. },
  526. }
  527. for _, tt := range tests {
  528. t.Run(tt.name, func(t *testing.T) {
  529. sqlValue := engine.formatAggregationResult(tt.spec, tt.result)
  530. assert.Equal(t, tt.expected, sqlValue.String())
  531. })
  532. }
  533. }
  534. // Test the direct formatAggregationResult scenario that was originally broken
  535. func TestSQLEngine_MinMaxBugFixIntegration(t *testing.T) {
  536. // This test focuses on the core bug fix without the complexity of table discovery
  537. // It directly tests the scenario where MIN/MAX returned empty due to the bug
  538. engine := NewTestSQLEngine()
  539. // Test the direct formatting path that was failing
  540. tests := []struct {
  541. name string
  542. aggregSpec AggregationSpec
  543. aggResult AggregationResult
  544. expectedEmpty bool
  545. expectedValue string
  546. }{
  547. {
  548. name: "MIN with zero should not be empty (the original bug)",
  549. aggregSpec: AggregationSpec{Function: FuncMIN, Column: "id", Alias: "MIN(id)"},
  550. aggResult: AggregationResult{Min: int32(0)}, // This was returning empty before fix
  551. expectedEmpty: false,
  552. expectedValue: "0",
  553. },
  554. {
  555. name: "MAX with valid value should not be empty",
  556. aggregSpec: AggregationSpec{Function: FuncMAX, Column: "id", Alias: "MAX(id)"},
  557. aggResult: AggregationResult{Max: int32(99)},
  558. expectedEmpty: false,
  559. expectedValue: "99",
  560. },
  561. {
  562. name: "MIN with negative value should work",
  563. aggregSpec: AggregationSpec{Function: FuncMIN, Column: "score", Alias: "MIN(score)"},
  564. aggResult: AggregationResult{Min: int64(-10)},
  565. expectedEmpty: false,
  566. expectedValue: "-10",
  567. },
  568. {
  569. name: "MIN with nil should be empty (expected behavior)",
  570. aggregSpec: AggregationSpec{Function: FuncMIN, Column: "missing", Alias: "MIN(missing)"},
  571. aggResult: AggregationResult{Min: nil},
  572. expectedEmpty: true,
  573. expectedValue: "",
  574. },
  575. }
  576. for _, tt := range tests {
  577. t.Run(tt.name, func(t *testing.T) {
  578. // Test the formatAggregationResult function directly
  579. sqlValue := engine.formatAggregationResult(tt.aggregSpec, tt.aggResult)
  580. result := sqlValue.String()
  581. if tt.expectedEmpty {
  582. assert.Empty(t, result, "Result should be empty for nil values")
  583. } else {
  584. assert.NotEmpty(t, result, "Result should not be empty")
  585. assert.Equal(t, tt.expectedValue, result)
  586. }
  587. })
  588. }
  589. }
  590. // Test the tryFastParquetAggregation method specifically for the bug
  591. func TestSQLEngine_FastParquetAggregationBugFix(t *testing.T) {
  592. // This test verifies that the fast path aggregation logic works correctly
  593. // and doesn't return nil/empty values when it should return actual data
  594. engine := NewMockSQLEngine()
  595. computer := NewAggregationComputer(engine.SQLEngine)
  596. // Create realistic data sources that mimic the user's scenario
  597. dataSources := &TopicDataSources{
  598. ParquetFiles: map[string][]*ParquetFileStats{
  599. "/topics/test/test-topic/v2025-09-01-22-54-02/0000-0630": {
  600. {
  601. RowCount: 100,
  602. ColumnStats: map[string]*ParquetColumnStats{
  603. "id": {
  604. ColumnName: "id",
  605. MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 0}},
  606. MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: 99}},
  607. NullCount: 0,
  608. RowCount: 100,
  609. },
  610. },
  611. },
  612. },
  613. },
  614. ParquetRowCount: 100,
  615. LiveLogRowCount: 0, // Pure parquet scenario
  616. PartitionsCount: 1,
  617. }
  618. partitions := []string{"/topics/test/test-topic/v2025-09-01-22-54-02/0000-0630"}
  619. tests := []struct {
  620. name string
  621. aggregations []AggregationSpec
  622. validateResults func(t *testing.T, results []AggregationResult)
  623. }{
  624. {
  625. name: "Single MIN aggregation should return value not nil",
  626. aggregations: []AggregationSpec{
  627. {Function: FuncMIN, Column: "id", Alias: "MIN(id)"},
  628. },
  629. validateResults: func(t *testing.T, results []AggregationResult) {
  630. assert.Len(t, results, 1)
  631. assert.NotNil(t, results[0].Min, "MIN result should not be nil")
  632. assert.Equal(t, int32(0), results[0].Min, "MIN should return the correct minimum value")
  633. },
  634. },
  635. {
  636. name: "Single MAX aggregation should return value not nil",
  637. aggregations: []AggregationSpec{
  638. {Function: FuncMAX, Column: "id", Alias: "MAX(id)"},
  639. },
  640. validateResults: func(t *testing.T, results []AggregationResult) {
  641. assert.Len(t, results, 1)
  642. assert.NotNil(t, results[0].Max, "MAX result should not be nil")
  643. assert.Equal(t, int32(99), results[0].Max, "MAX should return the correct maximum value")
  644. },
  645. },
  646. {
  647. name: "Combined MIN/MAX should both return values",
  648. aggregations: []AggregationSpec{
  649. {Function: FuncMIN, Column: "id", Alias: "MIN(id)"},
  650. {Function: FuncMAX, Column: "id", Alias: "MAX(id)"},
  651. },
  652. validateResults: func(t *testing.T, results []AggregationResult) {
  653. assert.Len(t, results, 2)
  654. assert.NotNil(t, results[0].Min, "MIN result should not be nil")
  655. assert.NotNil(t, results[1].Max, "MAX result should not be nil")
  656. assert.Equal(t, int32(0), results[0].Min)
  657. assert.Equal(t, int32(99), results[1].Max)
  658. },
  659. },
  660. }
  661. for _, tt := range tests {
  662. t.Run(tt.name, func(t *testing.T) {
  663. ctx := context.Background()
  664. results, err := computer.ComputeFastPathAggregations(ctx, tt.aggregations, dataSources, partitions)
  665. assert.NoError(t, err, "ComputeFastPathAggregations should not error")
  666. tt.validateResults(t, results)
  667. })
  668. }
  669. }
  670. // Test ExecutionPlanBuilder
  671. func TestExecutionPlanBuilder_BuildAggregationPlan(t *testing.T) {
  672. engine := NewMockSQLEngine()
  673. builder := NewExecutionPlanBuilder(engine.SQLEngine)
  674. // Parse a simple SELECT statement using the native parser
  675. stmt, err := ParseSQL("SELECT COUNT(*) FROM test_topic")
  676. assert.NoError(t, err)
  677. selectStmt := stmt.(*SelectStatement)
  678. aggregations := []AggregationSpec{
  679. {Function: FuncCOUNT, Column: "*"},
  680. }
  681. strategy := AggregationStrategy{
  682. CanUseFastPath: true,
  683. Reason: "all_aggregations_supported",
  684. }
  685. dataSources := &TopicDataSources{
  686. ParquetRowCount: 100,
  687. LiveLogRowCount: 50,
  688. PartitionsCount: 3,
  689. ParquetFiles: map[string][]*ParquetFileStats{
  690. "partition-1": {{RowCount: 50}},
  691. "partition-2": {{RowCount: 50}},
  692. },
  693. }
  694. plan := builder.BuildAggregationPlan(selectStmt, aggregations, strategy, dataSources)
  695. assert.Equal(t, "SELECT", plan.QueryType)
  696. assert.Equal(t, "hybrid_fast_path", plan.ExecutionStrategy)
  697. assert.Contains(t, plan.DataSources, "parquet_stats")
  698. assert.Contains(t, plan.DataSources, "live_logs")
  699. assert.Equal(t, 3, plan.PartitionsScanned)
  700. assert.Equal(t, 2, plan.ParquetFilesScanned)
  701. assert.Contains(t, plan.OptimizationsUsed, "parquet_statistics")
  702. assert.Equal(t, []string{"COUNT(*)"}, plan.Aggregations)
  703. assert.Equal(t, int64(50), plan.TotalRowsProcessed) // Only live logs scanned
  704. }
  705. // Test Error Types
  706. func TestErrorTypes(t *testing.T) {
  707. t.Run("AggregationError", func(t *testing.T) {
  708. err := AggregationError{
  709. Operation: "MAX",
  710. Column: "id",
  711. Cause: errors.New("column not found"),
  712. }
  713. expected := "aggregation error in MAX(id): column not found"
  714. assert.Equal(t, expected, err.Error())
  715. })
  716. t.Run("DataSourceError", func(t *testing.T) {
  717. err := DataSourceError{
  718. Source: "partition_discovery:test.topic1",
  719. Cause: errors.New("network timeout"),
  720. }
  721. expected := "data source error in partition_discovery:test.topic1: network timeout"
  722. assert.Equal(t, expected, err.Error())
  723. })
  724. t.Run("OptimizationError", func(t *testing.T) {
  725. err := OptimizationError{
  726. Strategy: "fast_path_aggregation",
  727. Reason: "unsupported function: AVG",
  728. }
  729. expected := "optimization failed for fast_path_aggregation: unsupported function: AVG"
  730. assert.Equal(t, expected, err.Error())
  731. })
  732. }
  733. // Integration Tests
  734. func TestIntegration_FastPathOptimization(t *testing.T) {
  735. engine := NewMockSQLEngine()
  736. // Setup components
  737. optimizer := NewFastPathOptimizer(engine.SQLEngine)
  738. computer := NewAggregationComputer(engine.SQLEngine)
  739. // Mock data setup
  740. aggregations := []AggregationSpec{
  741. {Function: FuncCOUNT, Column: "*"},
  742. {Function: FuncMAX, Column: "id"},
  743. }
  744. // Step 1: Determine strategy
  745. strategy := optimizer.DetermineStrategy(aggregations)
  746. assert.True(t, strategy.CanUseFastPath)
  747. // Step 2: Mock data sources
  748. dataSources := &TopicDataSources{
  749. ParquetFiles: map[string][]*ParquetFileStats{
  750. "/topics/test/topic1/partition-1": {{
  751. RowCount: 75,
  752. ColumnStats: map[string]*ParquetColumnStats{
  753. "id": createMockColumnStats("id", int64(1), int64(100)),
  754. },
  755. }},
  756. },
  757. ParquetRowCount: 75,
  758. LiveLogRowCount: 25,
  759. PartitionsCount: 1,
  760. }
  761. partitions := []string{"/topics/test/topic1/partition-1"}
  762. // Step 3: Compute aggregations
  763. ctx := context.Background()
  764. results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
  765. assert.NoError(t, err)
  766. assert.Len(t, results, 2)
  767. assert.Equal(t, int64(100), results[0].Count) // 75 + 25
  768. assert.Equal(t, int64(100), results[1].Max) // From parquet stats mock
  769. }
  770. func TestIntegration_FallbackToFullScan(t *testing.T) {
  771. engine := NewMockSQLEngine()
  772. optimizer := NewFastPathOptimizer(engine.SQLEngine)
  773. // Unsupported aggregations
  774. aggregations := []AggregationSpec{
  775. {Function: "AVG", Column: "value"}, // Not supported
  776. }
  777. // Step 1: Strategy should reject fast path
  778. strategy := optimizer.DetermineStrategy(aggregations)
  779. assert.False(t, strategy.CanUseFastPath)
  780. assert.Equal(t, "unsupported_aggregation_functions", strategy.Reason)
  781. assert.NotEmpty(t, strategy.UnsupportedSpecs)
  782. }
  783. // Benchmark Tests
  784. func BenchmarkFastPathOptimizer_DetermineStrategy(b *testing.B) {
  785. engine := NewMockSQLEngine()
  786. optimizer := NewFastPathOptimizer(engine.SQLEngine)
  787. aggregations := []AggregationSpec{
  788. {Function: FuncCOUNT, Column: "*"},
  789. {Function: FuncMAX, Column: "id"},
  790. {Function: "MIN", Column: "value"},
  791. }
  792. b.ResetTimer()
  793. for i := 0; i < b.N; i++ {
  794. strategy := optimizer.DetermineStrategy(aggregations)
  795. _ = strategy.CanUseFastPath
  796. }
  797. }
  798. func BenchmarkAggregationComputer_ComputeFastPathAggregations(b *testing.B) {
  799. engine := NewMockSQLEngine()
  800. computer := NewAggregationComputer(engine.SQLEngine)
  801. dataSources := &TopicDataSources{
  802. ParquetFiles: map[string][]*ParquetFileStats{
  803. "partition-1": {{
  804. RowCount: 1000,
  805. ColumnStats: map[string]*ParquetColumnStats{
  806. "id": createMockColumnStats("id", int64(1), int64(1000)),
  807. },
  808. }},
  809. },
  810. ParquetRowCount: 1000,
  811. LiveLogRowCount: 100,
  812. }
  813. aggregations := []AggregationSpec{
  814. {Function: FuncCOUNT, Column: "*"},
  815. {Function: FuncMAX, Column: "id"},
  816. }
  817. partitions := []string{"partition-1"}
  818. ctx := context.Background()
  819. b.ResetTimer()
  820. for i := 0; i < b.N; i++ {
  821. results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
  822. if err != nil {
  823. b.Fatal(err)
  824. }
  825. _ = results
  826. }
  827. }
  828. // Tests for convertLogEntryToRecordValue - Protocol Buffer parsing bug fix
  829. func TestSQLEngine_ConvertLogEntryToRecordValue_ValidProtobuf(t *testing.T) {
  830. engine := NewTestSQLEngine()
  831. // Create a valid RecordValue protobuf with user data
  832. originalRecord := &schema_pb.RecordValue{
  833. Fields: map[string]*schema_pb.Value{
  834. "id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 42}},
  835. "name": {Kind: &schema_pb.Value_StringValue{StringValue: "test-user"}},
  836. "score": {Kind: &schema_pb.Value_DoubleValue{DoubleValue: 95.5}},
  837. },
  838. }
  839. // Serialize the protobuf (this is what MQ actually stores)
  840. protobufData, err := proto.Marshal(originalRecord)
  841. assert.NoError(t, err)
  842. // Create a LogEntry with the serialized data
  843. logEntry := &filer_pb.LogEntry{
  844. TsNs: 1609459200000000000, // 2021-01-01 00:00:00 UTC
  845. PartitionKeyHash: 123,
  846. Data: protobufData, // Protocol buffer data (not JSON!)
  847. Key: []byte("test-key-001"),
  848. }
  849. // Test the conversion
  850. result, source, err := engine.convertLogEntryToRecordValue(logEntry)
  851. // Verify no error
  852. assert.NoError(t, err)
  853. assert.Equal(t, "live_log", source)
  854. assert.NotNil(t, result)
  855. assert.NotNil(t, result.Fields)
  856. // Verify system columns are added correctly
  857. assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP)
  858. assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY)
  859. assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value())
  860. assert.Equal(t, []byte("test-key-001"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue())
  861. // Verify user data is preserved
  862. assert.Contains(t, result.Fields, "id")
  863. assert.Contains(t, result.Fields, "name")
  864. assert.Contains(t, result.Fields, "score")
  865. assert.Equal(t, int32(42), result.Fields["id"].GetInt32Value())
  866. assert.Equal(t, "test-user", result.Fields["name"].GetStringValue())
  867. assert.Equal(t, 95.5, result.Fields["score"].GetDoubleValue())
  868. }
  869. func TestSQLEngine_ConvertLogEntryToRecordValue_InvalidProtobuf(t *testing.T) {
  870. engine := NewTestSQLEngine()
  871. // Create LogEntry with invalid protobuf data (this would cause the original JSON parsing bug)
  872. logEntry := &filer_pb.LogEntry{
  873. TsNs: 1609459200000000000,
  874. PartitionKeyHash: 123,
  875. Data: []byte{0x17, 0x00, 0xFF, 0xFE}, // Invalid protobuf data (starts with \x17 like in the original error)
  876. Key: []byte("test-key"),
  877. }
  878. // Test the conversion
  879. result, source, err := engine.convertLogEntryToRecordValue(logEntry)
  880. // Should return error for invalid protobuf
  881. assert.Error(t, err)
  882. assert.Contains(t, err.Error(), "failed to unmarshal log entry protobuf")
  883. assert.Nil(t, result)
  884. assert.Empty(t, source)
  885. }
  886. func TestSQLEngine_ConvertLogEntryToRecordValue_EmptyProtobuf(t *testing.T) {
  887. engine := NewTestSQLEngine()
  888. // Create a minimal valid RecordValue (empty fields)
  889. emptyRecord := &schema_pb.RecordValue{
  890. Fields: map[string]*schema_pb.Value{},
  891. }
  892. protobufData, err := proto.Marshal(emptyRecord)
  893. assert.NoError(t, err)
  894. logEntry := &filer_pb.LogEntry{
  895. TsNs: 1609459200000000000,
  896. PartitionKeyHash: 456,
  897. Data: protobufData,
  898. Key: []byte("empty-key"),
  899. }
  900. // Test the conversion
  901. result, source, err := engine.convertLogEntryToRecordValue(logEntry)
  902. // Should succeed and add system columns
  903. assert.NoError(t, err)
  904. assert.Equal(t, "live_log", source)
  905. assert.NotNil(t, result)
  906. assert.NotNil(t, result.Fields)
  907. // Should have system columns
  908. assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP)
  909. assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY)
  910. assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value())
  911. assert.Equal(t, []byte("empty-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue())
  912. // Should have no user fields
  913. userFieldCount := 0
  914. for fieldName := range result.Fields {
  915. if fieldName != SW_COLUMN_NAME_TIMESTAMP && fieldName != SW_COLUMN_NAME_KEY {
  916. userFieldCount++
  917. }
  918. }
  919. assert.Equal(t, 0, userFieldCount)
  920. }
  921. func TestSQLEngine_ConvertLogEntryToRecordValue_NilFieldsMap(t *testing.T) {
  922. engine := NewTestSQLEngine()
  923. // Create RecordValue with nil Fields map (edge case)
  924. recordWithNilFields := &schema_pb.RecordValue{
  925. Fields: nil, // This should be handled gracefully
  926. }
  927. protobufData, err := proto.Marshal(recordWithNilFields)
  928. assert.NoError(t, err)
  929. logEntry := &filer_pb.LogEntry{
  930. TsNs: 1609459200000000000,
  931. PartitionKeyHash: 789,
  932. Data: protobufData,
  933. Key: []byte("nil-fields-key"),
  934. }
  935. // Test the conversion
  936. result, source, err := engine.convertLogEntryToRecordValue(logEntry)
  937. // Should succeed and create Fields map
  938. assert.NoError(t, err)
  939. assert.Equal(t, "live_log", source)
  940. assert.NotNil(t, result)
  941. assert.NotNil(t, result.Fields) // Should be created by the function
  942. // Should have system columns
  943. assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP)
  944. assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY)
  945. assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value())
  946. assert.Equal(t, []byte("nil-fields-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue())
  947. }
  948. func TestSQLEngine_ConvertLogEntryToRecordValue_SystemColumnOverride(t *testing.T) {
  949. engine := NewTestSQLEngine()
  950. // Create RecordValue that already has system column names (should be overridden)
  951. recordWithSystemCols := &schema_pb.RecordValue{
  952. Fields: map[string]*schema_pb.Value{
  953. "user_field": {Kind: &schema_pb.Value_StringValue{StringValue: "user-data"}},
  954. SW_COLUMN_NAME_TIMESTAMP: {Kind: &schema_pb.Value_Int64Value{Int64Value: 999999999}}, // Should be overridden
  955. SW_COLUMN_NAME_KEY: {Kind: &schema_pb.Value_StringValue{StringValue: "old-key"}}, // Should be overridden
  956. },
  957. }
  958. protobufData, err := proto.Marshal(recordWithSystemCols)
  959. assert.NoError(t, err)
  960. logEntry := &filer_pb.LogEntry{
  961. TsNs: 1609459200000000000,
  962. PartitionKeyHash: 100,
  963. Data: protobufData,
  964. Key: []byte("actual-key"),
  965. }
  966. // Test the conversion
  967. result, source, err := engine.convertLogEntryToRecordValue(logEntry)
  968. // Should succeed
  969. assert.NoError(t, err)
  970. assert.Equal(t, "live_log", source)
  971. assert.NotNil(t, result)
  972. // System columns should use LogEntry values, not protobuf values
  973. assert.Equal(t, int64(1609459200000000000), result.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value())
  974. assert.Equal(t, []byte("actual-key"), result.Fields[SW_COLUMN_NAME_KEY].GetBytesValue())
  975. // User field should be preserved
  976. assert.Contains(t, result.Fields, "user_field")
  977. assert.Equal(t, "user-data", result.Fields["user_field"].GetStringValue())
  978. }
  979. func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) {
  980. engine := NewTestSQLEngine()
  981. // Test with various data types
  982. complexRecord := &schema_pb.RecordValue{
  983. Fields: map[string]*schema_pb.Value{
  984. "int32_field": {Kind: &schema_pb.Value_Int32Value{Int32Value: -42}},
  985. "int64_field": {Kind: &schema_pb.Value_Int64Value{Int64Value: 9223372036854775807}},
  986. "float_field": {Kind: &schema_pb.Value_FloatValue{FloatValue: 3.14159}},
  987. "double_field": {Kind: &schema_pb.Value_DoubleValue{DoubleValue: 2.718281828}},
  988. "bool_field": {Kind: &schema_pb.Value_BoolValue{BoolValue: true}},
  989. "string_field": {Kind: &schema_pb.Value_StringValue{StringValue: "test string with unicode 🎉"}},
  990. "bytes_field": {Kind: &schema_pb.Value_BytesValue{BytesValue: []byte{0x01, 0x02, 0x03}}},
  991. },
  992. }
  993. protobufData, err := proto.Marshal(complexRecord)
  994. assert.NoError(t, err)
  995. logEntry := &filer_pb.LogEntry{
  996. TsNs: 1609459200000000000,
  997. PartitionKeyHash: 200,
  998. Data: protobufData,
  999. Key: []byte("complex-key"),
  1000. }
  1001. // Test the conversion
  1002. result, source, err := engine.convertLogEntryToRecordValue(logEntry)
  1003. // Should succeed
  1004. assert.NoError(t, err)
  1005. assert.Equal(t, "live_log", source)
  1006. assert.NotNil(t, result)
  1007. // Verify all data types are preserved
  1008. assert.Equal(t, int32(-42), result.Fields["int32_field"].GetInt32Value())
  1009. assert.Equal(t, int64(9223372036854775807), result.Fields["int64_field"].GetInt64Value())
  1010. assert.Equal(t, float32(3.14159), result.Fields["float_field"].GetFloatValue())
  1011. assert.Equal(t, 2.718281828, result.Fields["double_field"].GetDoubleValue())
  1012. assert.Equal(t, true, result.Fields["bool_field"].GetBoolValue())
  1013. assert.Equal(t, "test string with unicode 🎉", result.Fields["string_field"].GetStringValue())
  1014. assert.Equal(t, []byte{0x01, 0x02, 0x03}, result.Fields["bytes_field"].GetBytesValue())
  1015. // System columns should still be present
  1016. assert.Contains(t, result.Fields, SW_COLUMN_NAME_TIMESTAMP)
  1017. assert.Contains(t, result.Fields, SW_COLUMN_NAME_KEY)
  1018. }
  1019. // Tests for log buffer deduplication functionality
  1020. func TestSQLEngine_GetLogBufferStartFromFile_BinaryFormat(t *testing.T) {
  1021. engine := NewTestSQLEngine()
  1022. // Create sample buffer start (binary format)
  1023. bufferStartBytes := make([]byte, 8)
  1024. binary.BigEndian.PutUint64(bufferStartBytes, uint64(1609459100000000001))
  1025. // Create file entry with buffer start + some chunks
  1026. entry := &filer_pb.Entry{
  1027. Name: "test-log-file",
  1028. Extended: map[string][]byte{
  1029. "buffer_start": bufferStartBytes,
  1030. },
  1031. Chunks: []*filer_pb.FileChunk{
  1032. {FileId: "chunk1", Offset: 0, Size: 1000},
  1033. {FileId: "chunk2", Offset: 1000, Size: 1000},
  1034. {FileId: "chunk3", Offset: 2000, Size: 1000},
  1035. },
  1036. }
  1037. // Test extraction
  1038. result, err := engine.getLogBufferStartFromFile(entry)
  1039. assert.NoError(t, err)
  1040. assert.NotNil(t, result)
  1041. assert.Equal(t, int64(1609459100000000001), result.StartIndex)
  1042. // Test extraction works correctly with the binary format
  1043. }
  1044. func TestSQLEngine_GetLogBufferStartFromFile_NoMetadata(t *testing.T) {
  1045. engine := NewTestSQLEngine()
  1046. // Create file entry without buffer start
  1047. entry := &filer_pb.Entry{
  1048. Name: "test-log-file",
  1049. Extended: nil,
  1050. }
  1051. // Test extraction
  1052. result, err := engine.getLogBufferStartFromFile(entry)
  1053. assert.NoError(t, err)
  1054. assert.Nil(t, result)
  1055. }
  1056. func TestSQLEngine_GetLogBufferStartFromFile_InvalidData(t *testing.T) {
  1057. engine := NewTestSQLEngine()
  1058. // Create file entry with invalid buffer start (wrong size)
  1059. entry := &filer_pb.Entry{
  1060. Name: "test-log-file",
  1061. Extended: map[string][]byte{
  1062. "buffer_start": []byte("invalid-binary"),
  1063. },
  1064. }
  1065. // Test extraction
  1066. result, err := engine.getLogBufferStartFromFile(entry)
  1067. assert.Error(t, err)
  1068. assert.Contains(t, err.Error(), "invalid buffer_start format: expected 8 bytes")
  1069. assert.Nil(t, result)
  1070. }
  1071. func TestSQLEngine_BuildLogBufferDeduplicationMap_NoBrokerClient(t *testing.T) {
  1072. engine := NewTestSQLEngine()
  1073. engine.catalog.brokerClient = nil // Simulate no broker client
  1074. ctx := context.Background()
  1075. result, err := engine.buildLogBufferDeduplicationMap(ctx, "/topics/test/test-topic")
  1076. assert.NoError(t, err)
  1077. assert.NotNil(t, result)
  1078. assert.Empty(t, result)
  1079. }
  1080. func TestSQLEngine_LogBufferDeduplication_ServerRestartScenario(t *testing.T) {
  1081. // Simulate scenario: Buffer indexes are now initialized with process start time
  1082. // This tests that buffer start indexes are globally unique across server restarts
  1083. // Before server restart: Process 1 buffer start (3 chunks)
  1084. beforeRestartStart := LogBufferStart{
  1085. StartIndex: 1609459100000000000, // Process 1 start time
  1086. }
  1087. // After server restart: Process 2 buffer start (3 chunks)
  1088. afterRestartStart := LogBufferStart{
  1089. StartIndex: 1609459300000000000, // Process 2 start time (DIFFERENT)
  1090. }
  1091. // Simulate 3 chunks for each file
  1092. chunkCount := int64(3)
  1093. // Calculate end indexes for range comparison
  1094. beforeEnd := beforeRestartStart.StartIndex + chunkCount - 1 // [start, start+2]
  1095. afterStart := afterRestartStart.StartIndex // [start, start+2]
  1096. // Test range overlap detection (should NOT overlap)
  1097. overlaps := beforeRestartStart.StartIndex <= (afterStart+chunkCount-1) && beforeEnd >= afterStart
  1098. assert.False(t, overlaps, "Buffer ranges after restart should not overlap")
  1099. // Verify the start indexes are globally unique
  1100. assert.NotEqual(t, beforeRestartStart.StartIndex, afterRestartStart.StartIndex, "Start indexes should be different")
  1101. assert.Less(t, beforeEnd, afterStart, "Ranges should be completely separate")
  1102. // Expected values:
  1103. // Before restart: [1609459100000000000, 1609459100000000002]
  1104. // After restart: [1609459300000000000, 1609459300000000002]
  1105. expectedBeforeEnd := int64(1609459100000000002)
  1106. expectedAfterStart := int64(1609459300000000000)
  1107. assert.Equal(t, expectedBeforeEnd, beforeEnd)
  1108. assert.Equal(t, expectedAfterStart, afterStart)
  1109. // This demonstrates that buffer start indexes initialized with process start time
  1110. // prevent false positive duplicates across server restarts
  1111. }
  1112. func TestBrokerClient_BinaryBufferStartFormat(t *testing.T) {
  1113. // Test scenario: getBufferStartFromEntry should only support binary format
  1114. // This tests the standardized binary format for buffer_start metadata
  1115. realBrokerClient := &BrokerClient{}
  1116. // Test binary format (used by both log files and Parquet files)
  1117. binaryEntry := &filer_pb.Entry{
  1118. Name: "2025-01-07-14-30-45",
  1119. IsDirectory: false,
  1120. Extended: map[string][]byte{
  1121. "buffer_start": func() []byte {
  1122. // Binary format: 8-byte BigEndian
  1123. buf := make([]byte, 8)
  1124. binary.BigEndian.PutUint64(buf, uint64(2000001))
  1125. return buf
  1126. }(),
  1127. },
  1128. }
  1129. bufferStart := realBrokerClient.getBufferStartFromEntry(binaryEntry)
  1130. assert.NotNil(t, bufferStart)
  1131. assert.Equal(t, int64(2000001), bufferStart.StartIndex, "Should parse binary buffer_start metadata")
  1132. // Test Parquet file (same binary format)
  1133. parquetEntry := &filer_pb.Entry{
  1134. Name: "2025-01-07-14-30.parquet",
  1135. IsDirectory: false,
  1136. Extended: map[string][]byte{
  1137. "buffer_start": func() []byte {
  1138. buf := make([]byte, 8)
  1139. binary.BigEndian.PutUint64(buf, uint64(1500001))
  1140. return buf
  1141. }(),
  1142. },
  1143. }
  1144. bufferStart = realBrokerClient.getBufferStartFromEntry(parquetEntry)
  1145. assert.NotNil(t, bufferStart)
  1146. assert.Equal(t, int64(1500001), bufferStart.StartIndex, "Should parse binary buffer_start from Parquet file")
  1147. // Test missing metadata
  1148. emptyEntry := &filer_pb.Entry{
  1149. Name: "no-metadata",
  1150. IsDirectory: false,
  1151. Extended: nil,
  1152. }
  1153. bufferStart = realBrokerClient.getBufferStartFromEntry(emptyEntry)
  1154. assert.Nil(t, bufferStart, "Should return nil for entry without buffer_start metadata")
  1155. // Test invalid format (wrong size)
  1156. invalidEntry := &filer_pb.Entry{
  1157. Name: "invalid-metadata",
  1158. IsDirectory: false,
  1159. Extended: map[string][]byte{
  1160. "buffer_start": []byte("invalid"),
  1161. },
  1162. }
  1163. bufferStart = realBrokerClient.getBufferStartFromEntry(invalidEntry)
  1164. assert.Nil(t, bufferStart, "Should return nil for invalid buffer_start metadata")
  1165. }
  1166. // TestGetSQLValAlias tests the getSQLValAlias function, particularly for SQL injection prevention
  1167. func TestGetSQLValAlias(t *testing.T) {
  1168. engine := &SQLEngine{}
  1169. tests := []struct {
  1170. name string
  1171. sqlVal *SQLVal
  1172. expected string
  1173. desc string
  1174. }{
  1175. {
  1176. name: "simple string",
  1177. sqlVal: &SQLVal{
  1178. Type: StrVal,
  1179. Val: []byte("hello"),
  1180. },
  1181. expected: "'hello'",
  1182. desc: "Simple string should be wrapped in single quotes",
  1183. },
  1184. {
  1185. name: "string with single quote",
  1186. sqlVal: &SQLVal{
  1187. Type: StrVal,
  1188. Val: []byte("don't"),
  1189. },
  1190. expected: "'don''t'",
  1191. desc: "String with single quote should have the quote escaped by doubling it",
  1192. },
  1193. {
  1194. name: "string with multiple single quotes",
  1195. sqlVal: &SQLVal{
  1196. Type: StrVal,
  1197. Val: []byte("'malicious'; DROP TABLE users; --"),
  1198. },
  1199. expected: "'''malicious''; DROP TABLE users; --'",
  1200. desc: "String with SQL injection attempt should have all single quotes properly escaped",
  1201. },
  1202. {
  1203. name: "empty string",
  1204. sqlVal: &SQLVal{
  1205. Type: StrVal,
  1206. Val: []byte(""),
  1207. },
  1208. expected: "''",
  1209. desc: "Empty string should result in empty quoted string",
  1210. },
  1211. {
  1212. name: "integer value",
  1213. sqlVal: &SQLVal{
  1214. Type: IntVal,
  1215. Val: []byte("123"),
  1216. },
  1217. expected: "123",
  1218. desc: "Integer value should not be quoted",
  1219. },
  1220. {
  1221. name: "float value",
  1222. sqlVal: &SQLVal{
  1223. Type: FloatVal,
  1224. Val: []byte("123.45"),
  1225. },
  1226. expected: "123.45",
  1227. desc: "Float value should not be quoted",
  1228. },
  1229. }
  1230. for _, tt := range tests {
  1231. t.Run(tt.name, func(t *testing.T) {
  1232. result := engine.getSQLValAlias(tt.sqlVal)
  1233. assert.Equal(t, tt.expected, result, tt.desc)
  1234. })
  1235. }
  1236. }