to_schema_value.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package schema
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/parquet-go/parquet-go"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  7. )
  8. // ToRecordValue converts a parquet.Row to a schema_pb.RecordValue
  9. // This does not work or did not test with nested structures.
  10. // Using this may fail to convert the parquet.Row to schema_pb.RecordValue
  11. func ToRecordValue(recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, row parquet.Row) (*schema_pb.RecordValue, error) {
  12. values := []parquet.Value(row)
  13. recordValue, _, err := toRecordValue(recordType, parquetLevels, values, 0)
  14. if err != nil {
  15. return nil, err
  16. }
  17. return recordValue.GetRecordValue(), nil
  18. }
  19. func ToValue(t *schema_pb.Type, levels *ParquetLevels, values []parquet.Value, valueIndex int) (value *schema_pb.Value, endValueIndex int, err error) {
  20. switch t.Kind.(type) {
  21. case *schema_pb.Type_ScalarType:
  22. return toScalarValue(t.GetScalarType(), levels, values, valueIndex)
  23. case *schema_pb.Type_ListType:
  24. return toListValue(t.GetListType(), levels, values, valueIndex)
  25. case *schema_pb.Type_RecordType:
  26. return toRecordValue(t.GetRecordType(), levels, values, valueIndex)
  27. }
  28. return nil, valueIndex, fmt.Errorf("unsupported type: %v", t)
  29. }
  30. func toRecordValue(recordType *schema_pb.RecordType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (*schema_pb.Value, int, error) {
  31. recordValue := schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)}
  32. for _, field := range recordType.Fields {
  33. fieldLevels := levels.levels[field.Name]
  34. fieldValue, endValueIndex, err := ToValue(field.Type, fieldLevels, values, valueIndex)
  35. if err != nil {
  36. return nil, 0, err
  37. }
  38. valueIndex = endValueIndex
  39. recordValue.Fields[field.Name] = fieldValue
  40. }
  41. return &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: &recordValue}}, valueIndex, nil
  42. }
  43. func toListValue(listType *schema_pb.ListType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (listValue *schema_pb.Value, endValueIndex int, err error) {
  44. listValues := make([]*schema_pb.Value, 0)
  45. var value *schema_pb.Value
  46. for valueIndex < len(values) {
  47. if values[valueIndex].Column() != levels.startColumnIndex {
  48. break
  49. }
  50. value, valueIndex, err = ToValue(listType.ElementType, levels, values, valueIndex)
  51. if err != nil {
  52. return nil, valueIndex, err
  53. }
  54. listValues = append(listValues, value)
  55. }
  56. return &schema_pb.Value{Kind: &schema_pb.Value_ListValue{ListValue: &schema_pb.ListValue{Values: listValues}}}, valueIndex, nil
  57. }
  58. func toScalarValue(scalarType schema_pb.ScalarType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (*schema_pb.Value, int, error) {
  59. value := values[valueIndex]
  60. if value.Column() != levels.startColumnIndex {
  61. return nil, valueIndex, nil
  62. }
  63. switch scalarType {
  64. case schema_pb.ScalarType_BOOL:
  65. return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex + 1, nil
  66. case schema_pb.ScalarType_INT32:
  67. return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex + 1, nil
  68. case schema_pb.ScalarType_INT64:
  69. return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex + 1, nil
  70. case schema_pb.ScalarType_FLOAT:
  71. return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex + 1, nil
  72. case schema_pb.ScalarType_DOUBLE:
  73. return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex + 1, nil
  74. case schema_pb.ScalarType_BYTES:
  75. // Handle nil byte arrays from parquet to prevent growslice panic
  76. byteData := value.ByteArray()
  77. if byteData == nil {
  78. byteData = []byte{} // Use empty slice instead of nil
  79. }
  80. return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: byteData}}, valueIndex + 1, nil
  81. case schema_pb.ScalarType_STRING:
  82. // Handle nil byte arrays from parquet to prevent string conversion issues
  83. byteData := value.ByteArray()
  84. if byteData == nil {
  85. byteData = []byte{} // Use empty slice instead of nil
  86. }
  87. return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(byteData)}}, valueIndex + 1, nil
  88. // Parquet logical types - convert from their physical storage back to logical values
  89. case schema_pb.ScalarType_TIMESTAMP:
  90. // Stored as INT64, convert back to TimestampValue
  91. return &schema_pb.Value{
  92. Kind: &schema_pb.Value_TimestampValue{
  93. TimestampValue: &schema_pb.TimestampValue{
  94. TimestampMicros: value.Int64(),
  95. IsUtc: true, // Default to UTC for compatibility
  96. },
  97. },
  98. }, valueIndex + 1, nil
  99. case schema_pb.ScalarType_DATE:
  100. // Stored as INT32, convert back to DateValue
  101. return &schema_pb.Value{
  102. Kind: &schema_pb.Value_DateValue{
  103. DateValue: &schema_pb.DateValue{
  104. DaysSinceEpoch: value.Int32(),
  105. },
  106. },
  107. }, valueIndex + 1, nil
  108. case schema_pb.ScalarType_DECIMAL:
  109. // Stored as FixedLenByteArray, convert back to DecimalValue
  110. fixedBytes := value.ByteArray() // FixedLenByteArray also uses ByteArray() method
  111. if fixedBytes == nil {
  112. fixedBytes = []byte{} // Use empty slice instead of nil
  113. }
  114. // Remove leading zeros to get the minimal representation
  115. trimmedBytes := bytes.TrimLeft(fixedBytes, "\x00")
  116. if len(trimmedBytes) == 0 {
  117. trimmedBytes = []byte{0} // Ensure we have at least one byte for zero
  118. }
  119. return &schema_pb.Value{
  120. Kind: &schema_pb.Value_DecimalValue{
  121. DecimalValue: &schema_pb.DecimalValue{
  122. Value: trimmedBytes,
  123. Precision: 38, // Maximum precision supported by schema
  124. Scale: 18, // Maximum scale supported by schema
  125. },
  126. },
  127. }, valueIndex + 1, nil
  128. case schema_pb.ScalarType_TIME:
  129. // Stored as INT64, convert back to TimeValue
  130. return &schema_pb.Value{
  131. Kind: &schema_pb.Value_TimeValue{
  132. TimeValue: &schema_pb.TimeValue{
  133. TimeMicros: value.Int64(),
  134. },
  135. },
  136. }, valueIndex + 1, nil
  137. }
  138. return nil, valueIndex, fmt.Errorf("unsupported scalar type: %v", scalarType)
  139. }