schema_parsing_test.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package engine
  2. import (
  3. "context"
  4. "testing"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. // TestSchemaAwareParsing tests the schema-aware message parsing functionality
  8. func TestSchemaAwareParsing(t *testing.T) {
  9. // Create a mock HybridMessageScanner with schema
  10. recordSchema := &schema_pb.RecordType{
  11. Fields: []*schema_pb.Field{
  12. {
  13. Name: "user_id",
  14. Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}},
  15. },
  16. {
  17. Name: "event_type",
  18. Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}},
  19. },
  20. {
  21. Name: "cpu_usage",
  22. Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}},
  23. },
  24. {
  25. Name: "is_active",
  26. Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BOOL}},
  27. },
  28. },
  29. }
  30. scanner := &HybridMessageScanner{
  31. recordSchema: recordSchema,
  32. }
  33. t.Run("JSON Message Parsing", func(t *testing.T) {
  34. jsonData := []byte(`{"user_id": 1234, "event_type": "login", "cpu_usage": 75.5, "is_active": true}`)
  35. result, err := scanner.parseJSONMessage(jsonData)
  36. if err != nil {
  37. t.Fatalf("Failed to parse JSON message: %v", err)
  38. }
  39. // Verify user_id as int32
  40. if userIdVal := result.Fields["user_id"]; userIdVal == nil {
  41. t.Error("user_id field missing")
  42. } else if userIdVal.GetInt32Value() != 1234 {
  43. t.Errorf("Expected user_id=1234, got %v", userIdVal.GetInt32Value())
  44. }
  45. // Verify event_type as string
  46. if eventTypeVal := result.Fields["event_type"]; eventTypeVal == nil {
  47. t.Error("event_type field missing")
  48. } else if eventTypeVal.GetStringValue() != "login" {
  49. t.Errorf("Expected event_type='login', got %v", eventTypeVal.GetStringValue())
  50. }
  51. // Verify cpu_usage as double
  52. if cpuVal := result.Fields["cpu_usage"]; cpuVal == nil {
  53. t.Error("cpu_usage field missing")
  54. } else if cpuVal.GetDoubleValue() != 75.5 {
  55. t.Errorf("Expected cpu_usage=75.5, got %v", cpuVal.GetDoubleValue())
  56. }
  57. // Verify is_active as bool
  58. if isActiveVal := result.Fields["is_active"]; isActiveVal == nil {
  59. t.Error("is_active field missing")
  60. } else if !isActiveVal.GetBoolValue() {
  61. t.Errorf("Expected is_active=true, got %v", isActiveVal.GetBoolValue())
  62. }
  63. t.Logf("JSON parsing correctly converted types: int32=%d, string='%s', double=%.1f, bool=%v",
  64. result.Fields["user_id"].GetInt32Value(),
  65. result.Fields["event_type"].GetStringValue(),
  66. result.Fields["cpu_usage"].GetDoubleValue(),
  67. result.Fields["is_active"].GetBoolValue())
  68. })
  69. t.Run("Raw Data Type Conversion", func(t *testing.T) {
  70. // Test string conversion
  71. stringType := &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}
  72. stringVal, err := scanner.convertRawDataToSchemaValue([]byte("hello world"), stringType)
  73. if err != nil {
  74. t.Errorf("Failed to convert string: %v", err)
  75. } else if stringVal.GetStringValue() != "hello world" {
  76. t.Errorf("String conversion failed: got %v", stringVal.GetStringValue())
  77. }
  78. // Test int32 conversion
  79. int32Type := &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}
  80. int32Val, err := scanner.convertRawDataToSchemaValue([]byte("42"), int32Type)
  81. if err != nil {
  82. t.Errorf("Failed to convert int32: %v", err)
  83. } else if int32Val.GetInt32Value() != 42 {
  84. t.Errorf("Int32 conversion failed: got %v", int32Val.GetInt32Value())
  85. }
  86. // Test double conversion
  87. doubleType := &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}
  88. doubleVal, err := scanner.convertRawDataToSchemaValue([]byte("3.14159"), doubleType)
  89. if err != nil {
  90. t.Errorf("Failed to convert double: %v", err)
  91. } else if doubleVal.GetDoubleValue() != 3.14159 {
  92. t.Errorf("Double conversion failed: got %v", doubleVal.GetDoubleValue())
  93. }
  94. // Test bool conversion
  95. boolType := &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BOOL}}
  96. boolVal, err := scanner.convertRawDataToSchemaValue([]byte("true"), boolType)
  97. if err != nil {
  98. t.Errorf("Failed to convert bool: %v", err)
  99. } else if !boolVal.GetBoolValue() {
  100. t.Errorf("Bool conversion failed: got %v", boolVal.GetBoolValue())
  101. }
  102. t.Log("Raw data type conversions working correctly")
  103. })
  104. t.Run("Invalid JSON Graceful Handling", func(t *testing.T) {
  105. invalidJSON := []byte(`{"user_id": 1234, "malformed": }`)
  106. _, err := scanner.parseJSONMessage(invalidJSON)
  107. if err == nil {
  108. t.Error("Expected error for invalid JSON, but got none")
  109. }
  110. t.Log("Invalid JSON handled gracefully with error")
  111. })
  112. }
  113. // TestSchemaAwareParsingIntegration tests the full integration with SQL engine
  114. func TestSchemaAwareParsingIntegration(t *testing.T) {
  115. engine := NewTestSQLEngine()
  116. // Test that the enhanced schema-aware parsing doesn't break existing functionality
  117. result, err := engine.ExecuteSQL(context.Background(), "SELECT *, _source FROM user_events LIMIT 2")
  118. if err != nil {
  119. t.Fatalf("Schema-aware parsing broke basic SELECT: %v", err)
  120. }
  121. if len(result.Rows) == 0 {
  122. t.Error("No rows returned - schema parsing may have issues")
  123. }
  124. // Check that _source column is still present (hybrid functionality)
  125. foundSourceColumn := false
  126. for _, col := range result.Columns {
  127. if col == "_source" {
  128. foundSourceColumn = true
  129. break
  130. }
  131. }
  132. if !foundSourceColumn {
  133. t.Log("_source column missing - running in fallback mode without real cluster")
  134. }
  135. t.Log("Schema-aware parsing integrates correctly with SQL engine")
  136. }