to_parquet_schema.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package schema
  2. import (
  3. "fmt"
  4. parquet "github.com/parquet-go/parquet-go"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) {
  8. rootNode, err := toParquetFieldTypeRecord(recordType)
  9. if err != nil {
  10. return nil, fmt.Errorf("failed to convert record type to parquet schema: %w", err)
  11. }
  12. // Fields are sorted by name, so the value should be sorted also
  13. // the sorting is inside parquet.`func (g Group) Fields() []Field`
  14. return parquet.NewSchema(topicName, rootNode), nil
  15. }
  16. func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) {
  17. // This is the old function - now defaults to Optional for backward compatibility
  18. return toParquetFieldTypeWithRequirement(fieldType, false)
  19. }
  20. func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) {
  21. elementType, err := toParquetFieldType(listType.ElementType)
  22. if err != nil {
  23. return nil, err
  24. }
  25. return parquet.Repeated(elementType), nil
  26. }
  27. func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
  28. switch scalarType {
  29. case schema_pb.ScalarType_BOOL:
  30. return parquet.Leaf(parquet.BooleanType), nil
  31. case schema_pb.ScalarType_INT32:
  32. return parquet.Leaf(parquet.Int32Type), nil
  33. case schema_pb.ScalarType_INT64:
  34. return parquet.Leaf(parquet.Int64Type), nil
  35. case schema_pb.ScalarType_FLOAT:
  36. return parquet.Leaf(parquet.FloatType), nil
  37. case schema_pb.ScalarType_DOUBLE:
  38. return parquet.Leaf(parquet.DoubleType), nil
  39. case schema_pb.ScalarType_BYTES:
  40. return parquet.Leaf(parquet.ByteArrayType), nil
  41. case schema_pb.ScalarType_STRING:
  42. return parquet.Leaf(parquet.ByteArrayType), nil
  43. // Parquet logical types - map to their physical storage types
  44. case schema_pb.ScalarType_TIMESTAMP:
  45. // Stored as INT64 (microseconds since Unix epoch)
  46. return parquet.Leaf(parquet.Int64Type), nil
  47. case schema_pb.ScalarType_DATE:
  48. // Stored as INT32 (days since Unix epoch)
  49. return parquet.Leaf(parquet.Int32Type), nil
  50. case schema_pb.ScalarType_DECIMAL:
  51. // Use maximum precision/scale to accommodate any decimal value
  52. // Per Parquet spec: precision ≤9→INT32, ≤18→INT64, >18→FixedLenByteArray
  53. // Using precision=38 (max for most systems), scale=18 for flexibility
  54. // Individual values can have smaller precision/scale, but schema supports maximum
  55. return parquet.Decimal(18, 38, parquet.FixedLenByteArrayType(16)), nil
  56. case schema_pb.ScalarType_TIME:
  57. // Stored as INT64 (microseconds since midnight)
  58. return parquet.Leaf(parquet.Int64Type), nil
  59. default:
  60. return nil, fmt.Errorf("unknown scalar type: %v", scalarType)
  61. }
  62. }
  63. func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
  64. recordNode := parquet.Group{}
  65. for _, field := range recordType.Fields {
  66. parquetFieldType, err := toParquetFieldTypeWithRequirement(field.Type, field.IsRequired)
  67. if err != nil {
  68. return nil, err
  69. }
  70. recordNode[field.Name] = parquetFieldType
  71. }
  72. return recordNode, nil
  73. }
  74. // toParquetFieldTypeWithRequirement creates parquet field type respecting required/optional constraints
  75. func toParquetFieldTypeWithRequirement(fieldType *schema_pb.Type, isRequired bool) (dataType parquet.Node, err error) {
  76. switch fieldType.Kind.(type) {
  77. case *schema_pb.Type_ScalarType:
  78. dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
  79. if err != nil {
  80. return nil, err
  81. }
  82. if isRequired {
  83. // Required fields are NOT wrapped in Optional
  84. return dataType, nil
  85. } else {
  86. // Optional fields are wrapped in Optional
  87. return parquet.Optional(dataType), nil
  88. }
  89. case *schema_pb.Type_RecordType:
  90. dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
  91. if err != nil {
  92. return nil, err
  93. }
  94. if isRequired {
  95. return dataType, nil
  96. } else {
  97. return parquet.Optional(dataType), nil
  98. }
  99. case *schema_pb.Type_ListType:
  100. dataType, err = toParquetFieldTypeList(fieldType.GetListType())
  101. if err != nil {
  102. return nil, err
  103. }
  104. // Lists are typically optional by nature
  105. return dataType, nil
  106. default:
  107. return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
  108. }
  109. }