| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- package engine
- import (
- "context"
- "fmt"
- "strings"
- "testing"
- )
- func TestSQLEngine_HybridSelectBasic(t *testing.T) {
- engine := NewTestSQLEngine()
- // Test SELECT with _source column to show both live and archived data
- result, err := engine.ExecuteSQL(context.Background(), "SELECT *, _source FROM user_events")
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
- if result.Error != nil {
- t.Fatalf("Expected no query error, got %v", result.Error)
- }
- if len(result.Columns) == 0 {
- t.Error("Expected columns in result")
- }
- // In mock environment, we only get live_log data from unflushed messages
- // parquet_archive data would come from parquet files in a real system
- if len(result.Rows) == 0 {
- t.Error("Expected rows in result")
- }
- // Check that we have the _source column showing data source
- hasSourceColumn := false
- sourceColumnIndex := -1
- for i, column := range result.Columns {
- if column == SW_COLUMN_NAME_SOURCE {
- hasSourceColumn = true
- sourceColumnIndex = i
- break
- }
- }
- if !hasSourceColumn {
- t.Skip("_source column not available in fallback mode - test requires real SeaweedFS cluster")
- }
- // Verify we have the expected data sources (in mock environment, only live_log)
- if hasSourceColumn && sourceColumnIndex >= 0 {
- foundLiveLog := false
- for _, row := range result.Rows {
- if sourceColumnIndex < len(row) {
- source := row[sourceColumnIndex].ToString()
- if source == "live_log" {
- foundLiveLog = true
- }
- // In mock environment, all data comes from unflushed messages (live_log)
- // In a real system, we would also see parquet_archive from parquet files
- }
- }
- if !foundLiveLog {
- t.Error("Expected to find live_log data source in results")
- }
- t.Logf("Found live_log data source from unflushed messages")
- }
- }
- func TestSQLEngine_HybridSelectWithLimit(t *testing.T) {
- engine := NewTestSQLEngine()
- // Test SELECT with LIMIT on hybrid data
- result, err := engine.ExecuteSQL(context.Background(), "SELECT * FROM user_events LIMIT 2")
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
- if result.Error != nil {
- t.Fatalf("Expected no query error, got %v", result.Error)
- }
- // Should have exactly 2 rows due to LIMIT
- if len(result.Rows) != 2 {
- t.Errorf("Expected 2 rows with LIMIT 2, got %d", len(result.Rows))
- }
- }
- func TestSQLEngine_HybridSelectDifferentTables(t *testing.T) {
- engine := NewTestSQLEngine()
- // Test both user_events and system_logs tables
- tables := []string{"user_events", "system_logs"}
- for _, tableName := range tables {
- result, err := engine.ExecuteSQL(context.Background(), fmt.Sprintf("SELECT *, _source FROM %s", tableName))
- if err != nil {
- t.Errorf("Error querying hybrid table %s: %v", tableName, err)
- continue
- }
- if result.Error != nil {
- t.Errorf("Query error for hybrid table %s: %v", tableName, result.Error)
- continue
- }
- if len(result.Columns) == 0 {
- t.Errorf("No columns returned for hybrid table %s", tableName)
- }
- if len(result.Rows) == 0 {
- t.Errorf("No rows returned for hybrid table %s", tableName)
- }
- // Check for _source column
- hasSourceColumn := false
- for _, column := range result.Columns {
- if column == "_source" {
- hasSourceColumn = true
- break
- }
- }
- if !hasSourceColumn {
- t.Logf("Table %s missing _source column - running in fallback mode", tableName)
- }
- t.Logf("Table %s: %d columns, %d rows with hybrid data sources", tableName, len(result.Columns), len(result.Rows))
- }
- }
- func TestSQLEngine_HybridDataSource(t *testing.T) {
- engine := NewTestSQLEngine()
- // Test that we can distinguish between live and archived data
- result, err := engine.ExecuteSQL(context.Background(), "SELECT user_id, event_type, _source FROM user_events")
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
- if result.Error != nil {
- t.Fatalf("Expected no query error, got %v", result.Error)
- }
- // Find the _source column
- sourceColumnIndex := -1
- eventTypeColumnIndex := -1
- for i, column := range result.Columns {
- switch column {
- case "_source":
- sourceColumnIndex = i
- case "event_type":
- eventTypeColumnIndex = i
- }
- }
- if sourceColumnIndex == -1 {
- t.Skip("Could not find _source column - test requires real SeaweedFS cluster")
- }
- if eventTypeColumnIndex == -1 {
- t.Fatal("Could not find event_type column")
- }
- // Check the data characteristics
- liveEventFound := false
- archivedEventFound := false
- for _, row := range result.Rows {
- if sourceColumnIndex < len(row) && eventTypeColumnIndex < len(row) {
- source := row[sourceColumnIndex].ToString()
- eventType := row[eventTypeColumnIndex].ToString()
- if source == "live_log" && strings.Contains(eventType, "live_") {
- liveEventFound = true
- t.Logf("Found live event: %s from %s", eventType, source)
- }
- if source == "parquet_archive" && strings.Contains(eventType, "archived_") {
- archivedEventFound = true
- t.Logf("Found archived event: %s from %s", eventType, source)
- }
- }
- }
- if !liveEventFound {
- t.Error("Expected to find live events with live_ prefix")
- }
- if !archivedEventFound {
- t.Error("Expected to find archived events with archived_ prefix")
- }
- }
- func TestSQLEngine_HybridSystemLogs(t *testing.T) {
- engine := NewTestSQLEngine()
- // Test system_logs with hybrid data
- result, err := engine.ExecuteSQL(context.Background(), "SELECT level, message, service, _source FROM system_logs")
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
- if result.Error != nil {
- t.Fatalf("Expected no query error, got %v", result.Error)
- }
- // Should have both live and archived system logs
- if len(result.Rows) < 2 {
- t.Errorf("Expected at least 2 system log entries, got %d", len(result.Rows))
- }
- // Find column indices
- levelIndex := -1
- sourceIndex := -1
- for i, column := range result.Columns {
- switch column {
- case "level":
- levelIndex = i
- case "_source":
- sourceIndex = i
- }
- }
- // Verify we have both live and archived system logs
- foundLive := false
- foundArchived := false
- for _, row := range result.Rows {
- if sourceIndex >= 0 && sourceIndex < len(row) {
- source := row[sourceIndex].ToString()
- if source == "live_log" {
- foundLive = true
- if levelIndex >= 0 && levelIndex < len(row) {
- level := row[levelIndex].ToString()
- t.Logf("Live system log: level=%s", level)
- }
- }
- if source == "parquet_archive" {
- foundArchived = true
- if levelIndex >= 0 && levelIndex < len(row) {
- level := row[levelIndex].ToString()
- t.Logf("Archived system log: level=%s", level)
- }
- }
- }
- }
- if !foundLive {
- t.Log("No live system logs found - running in fallback mode")
- }
- if !foundArchived {
- t.Log("No archived system logs found - running in fallback mode")
- }
- }
- func TestSQLEngine_HybridSelectWithTimeImplications(t *testing.T) {
- engine := NewTestSQLEngine()
- // Test that demonstrates the time-based nature of hybrid data
- // Live data should be more recent than archived data
- result, err := engine.ExecuteSQL(context.Background(), "SELECT event_type, _source FROM user_events")
- if err != nil {
- t.Fatalf("Expected no error, got %v", err)
- }
- if result.Error != nil {
- t.Fatalf("Expected no query error, got %v", result.Error)
- }
- // This test documents that hybrid scanning provides a complete view
- // of both recent (live) and historical (archived) data in a single query
- liveCount := 0
- archivedCount := 0
- sourceIndex := -1
- for i, column := range result.Columns {
- if column == "_source" {
- sourceIndex = i
- break
- }
- }
- if sourceIndex >= 0 {
- for _, row := range result.Rows {
- if sourceIndex < len(row) {
- source := row[sourceIndex].ToString()
- switch source {
- case "live_log":
- liveCount++
- case "parquet_archive":
- archivedCount++
- }
- }
- }
- }
- t.Logf("Hybrid query results: %d live messages, %d archived messages", liveCount, archivedCount)
- if liveCount == 0 && archivedCount == 0 {
- t.Log("No live or archived messages found - running in fallback mode")
- }
- }
|