| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- package schema
- import (
- "fmt"
- "strconv"
- parquet "github.com/parquet-go/parquet-go"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- )
- func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
- switch fieldType.Kind.(type) {
- case *schema_pb.Type_ScalarType:
- // If value is missing, write NULL at the correct column to keep rows aligned
- if fieldValue == nil || fieldValue.Kind == nil {
- rowBuilder.Add(levels.startColumnIndex, parquet.NullValue())
- return nil
- }
- var parquetValue parquet.Value
- parquetValue, err = toParquetValueForType(fieldType, fieldValue)
- if err != nil {
- return
- }
- // Safety check: prevent nil byte arrays from reaching parquet library
- if parquetValue.Kind() == parquet.ByteArray {
- byteData := parquetValue.ByteArray()
- if byteData == nil {
- parquetValue = parquet.ByteArrayValue([]byte{})
- }
- }
- rowBuilder.Add(levels.startColumnIndex, parquetValue)
- case *schema_pb.Type_ListType:
- // Advance to list position even if value is missing
- rowBuilder.Next(levels.startColumnIndex)
- if fieldValue == nil || fieldValue.GetListValue() == nil {
- return nil
- }
- elementType := fieldType.GetListType().ElementType
- for _, value := range fieldValue.GetListValue().Values {
- if err = rowBuilderVisit(rowBuilder, elementType, levels, value); err != nil {
- return
- }
- }
- }
- return
- }
- func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, recordValue *schema_pb.RecordValue) error {
- visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
- return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue)
- }
- fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}}
- fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}}
- return doVisitValue(fieldType, parquetLevels, fieldValue, visitor)
- }
- // typeValueVisitor is a function that is called for each value in a schema_pb.Value
- // Find the column index.
- // intended to be used in RowBuilder.Add(columnIndex, value)
- type typeValueVisitor func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error)
- // endIndex is exclusive
- // same logic as RowBuilder.configure in row_builder.go
- func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
- switch fieldType.Kind.(type) {
- case *schema_pb.Type_ScalarType:
- return visitor(fieldType, levels, fieldValue)
- case *schema_pb.Type_ListType:
- return visitor(fieldType, levels, fieldValue)
- case *schema_pb.Type_RecordType:
- for _, field := range fieldType.GetRecordType().Fields {
- var fv *schema_pb.Value
- if fieldValue != nil && fieldValue.GetRecordValue() != nil {
- var found bool
- fv, found = fieldValue.GetRecordValue().Fields[field.Name]
- if !found {
- // pass nil so visitor can emit NULL for alignment
- fv = nil
- }
- }
- fieldLevels := levels.levels[field.Name]
- err = doVisitValue(field.Type, fieldLevels, fv, visitor)
- if err != nil {
- return
- }
- }
- return
- }
- return
- }
- func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
- // Safety check for nil value
- if value == nil || value.Kind == nil {
- return parquet.NullValue(), fmt.Errorf("nil value or nil value kind")
- }
- switch value.Kind.(type) {
- case *schema_pb.Value_BoolValue:
- return parquet.BooleanValue(value.GetBoolValue()), nil
- case *schema_pb.Value_Int32Value:
- return parquet.Int32Value(value.GetInt32Value()), nil
- case *schema_pb.Value_Int64Value:
- return parquet.Int64Value(value.GetInt64Value()), nil
- case *schema_pb.Value_FloatValue:
- return parquet.FloatValue(value.GetFloatValue()), nil
- case *schema_pb.Value_DoubleValue:
- return parquet.DoubleValue(value.GetDoubleValue()), nil
- case *schema_pb.Value_BytesValue:
- // Handle nil byte slices to prevent growslice panic in parquet-go
- byteData := value.GetBytesValue()
- if byteData == nil {
- byteData = []byte{} // Use empty slice instead of nil
- }
- return parquet.ByteArrayValue(byteData), nil
- case *schema_pb.Value_StringValue:
- // Convert string to bytes, ensuring we never pass nil
- stringData := value.GetStringValue()
- return parquet.ByteArrayValue([]byte(stringData)), nil
- // Parquet logical types with safe conversion (preventing commit 7a4aeec60 panic)
- case *schema_pb.Value_TimestampValue:
- timestampValue := value.GetTimestampValue()
- if timestampValue == nil {
- return parquet.NullValue(), nil
- }
- return parquet.Int64Value(timestampValue.TimestampMicros), nil
- case *schema_pb.Value_DateValue:
- dateValue := value.GetDateValue()
- if dateValue == nil {
- return parquet.NullValue(), nil
- }
- return parquet.Int32Value(dateValue.DaysSinceEpoch), nil
- case *schema_pb.Value_DecimalValue:
- decimalValue := value.GetDecimalValue()
- if decimalValue == nil || decimalValue.Value == nil || len(decimalValue.Value) == 0 {
- return parquet.NullValue(), nil
- }
- // Validate input data - reject unreasonably large values instead of corrupting data
- if len(decimalValue.Value) > 64 {
- // Reject extremely large decimal values (>512 bits) as likely corrupted data
- // Better to fail fast than silently corrupt financial/scientific data
- return parquet.NullValue(), fmt.Errorf("decimal value too large: %d bytes (max 64)", len(decimalValue.Value))
- }
- // Convert to FixedLenByteArray to match schema (DECIMAL with FixedLenByteArray physical type)
- // This accommodates any precision up to 38 digits (16 bytes = 128 bits)
- // Pad or truncate to exactly 16 bytes for FixedLenByteArray
- fixedBytes := make([]byte, 16)
- if len(decimalValue.Value) <= 16 {
- // Right-align the value (big-endian)
- copy(fixedBytes[16-len(decimalValue.Value):], decimalValue.Value)
- } else {
- // Truncate if too large, taking the least significant bytes
- copy(fixedBytes, decimalValue.Value[len(decimalValue.Value)-16:])
- }
- return parquet.FixedLenByteArrayValue(fixedBytes), nil
- case *schema_pb.Value_TimeValue:
- timeValue := value.GetTimeValue()
- if timeValue == nil {
- return parquet.NullValue(), nil
- }
- return parquet.Int64Value(timeValue.TimeMicros), nil
- default:
- return parquet.NullValue(), fmt.Errorf("unknown value type: %T", value.Kind)
- }
- }
- // toParquetValueForType coerces a schema_pb.Value into a parquet.Value that matches the declared field type.
- func toParquetValueForType(fieldType *schema_pb.Type, value *schema_pb.Value) (parquet.Value, error) {
- switch t := fieldType.Kind.(type) {
- case *schema_pb.Type_ScalarType:
- switch t.ScalarType {
- case schema_pb.ScalarType_BOOL:
- switch v := value.Kind.(type) {
- case *schema_pb.Value_BoolValue:
- return parquet.BooleanValue(v.BoolValue), nil
- case *schema_pb.Value_StringValue:
- if b, err := strconv.ParseBool(v.StringValue); err == nil {
- return parquet.BooleanValue(b), nil
- }
- return parquet.BooleanValue(false), nil
- default:
- return parquet.BooleanValue(false), nil
- }
- case schema_pb.ScalarType_INT32:
- switch v := value.Kind.(type) {
- case *schema_pb.Value_Int32Value:
- return parquet.Int32Value(v.Int32Value), nil
- case *schema_pb.Value_Int64Value:
- return parquet.Int32Value(int32(v.Int64Value)), nil
- case *schema_pb.Value_DoubleValue:
- return parquet.Int32Value(int32(v.DoubleValue)), nil
- case *schema_pb.Value_StringValue:
- if i, err := strconv.ParseInt(v.StringValue, 10, 32); err == nil {
- return parquet.Int32Value(int32(i)), nil
- }
- return parquet.Int32Value(0), nil
- default:
- return parquet.Int32Value(0), nil
- }
- case schema_pb.ScalarType_INT64:
- switch v := value.Kind.(type) {
- case *schema_pb.Value_Int64Value:
- return parquet.Int64Value(v.Int64Value), nil
- case *schema_pb.Value_Int32Value:
- return parquet.Int64Value(int64(v.Int32Value)), nil
- case *schema_pb.Value_DoubleValue:
- return parquet.Int64Value(int64(v.DoubleValue)), nil
- case *schema_pb.Value_StringValue:
- if i, err := strconv.ParseInt(v.StringValue, 10, 64); err == nil {
- return parquet.Int64Value(i), nil
- }
- return parquet.Int64Value(0), nil
- default:
- return parquet.Int64Value(0), nil
- }
- case schema_pb.ScalarType_FLOAT:
- switch v := value.Kind.(type) {
- case *schema_pb.Value_FloatValue:
- return parquet.FloatValue(v.FloatValue), nil
- case *schema_pb.Value_DoubleValue:
- return parquet.FloatValue(float32(v.DoubleValue)), nil
- case *schema_pb.Value_Int64Value:
- return parquet.FloatValue(float32(v.Int64Value)), nil
- case *schema_pb.Value_StringValue:
- if f, err := strconv.ParseFloat(v.StringValue, 32); err == nil {
- return parquet.FloatValue(float32(f)), nil
- }
- return parquet.FloatValue(0), nil
- default:
- return parquet.FloatValue(0), nil
- }
- case schema_pb.ScalarType_DOUBLE:
- switch v := value.Kind.(type) {
- case *schema_pb.Value_DoubleValue:
- return parquet.DoubleValue(v.DoubleValue), nil
- case *schema_pb.Value_Int64Value:
- return parquet.DoubleValue(float64(v.Int64Value)), nil
- case *schema_pb.Value_Int32Value:
- return parquet.DoubleValue(float64(v.Int32Value)), nil
- case *schema_pb.Value_StringValue:
- if f, err := strconv.ParseFloat(v.StringValue, 64); err == nil {
- return parquet.DoubleValue(f), nil
- }
- return parquet.DoubleValue(0), nil
- default:
- return parquet.DoubleValue(0), nil
- }
- case schema_pb.ScalarType_BYTES:
- switch v := value.Kind.(type) {
- case *schema_pb.Value_BytesValue:
- b := v.BytesValue
- if b == nil {
- b = []byte{}
- }
- return parquet.ByteArrayValue(b), nil
- case *schema_pb.Value_StringValue:
- return parquet.ByteArrayValue([]byte(v.StringValue)), nil
- case *schema_pb.Value_Int64Value:
- return parquet.ByteArrayValue([]byte(strconv.FormatInt(v.Int64Value, 10))), nil
- case *schema_pb.Value_Int32Value:
- return parquet.ByteArrayValue([]byte(strconv.FormatInt(int64(v.Int32Value), 10))), nil
- case *schema_pb.Value_DoubleValue:
- return parquet.ByteArrayValue([]byte(strconv.FormatFloat(v.DoubleValue, 'f', -1, 64))), nil
- case *schema_pb.Value_FloatValue:
- return parquet.ByteArrayValue([]byte(strconv.FormatFloat(float64(v.FloatValue), 'f', -1, 32))), nil
- case *schema_pb.Value_BoolValue:
- if v.BoolValue {
- return parquet.ByteArrayValue([]byte("true")), nil
- }
- return parquet.ByteArrayValue([]byte("false")), nil
- default:
- return parquet.ByteArrayValue([]byte{}), nil
- }
- case schema_pb.ScalarType_STRING:
- // Same as bytes but semantically string
- switch v := value.Kind.(type) {
- case *schema_pb.Value_StringValue:
- return parquet.ByteArrayValue([]byte(v.StringValue)), nil
- default:
- // Fallback through bytes coercion
- b, _ := toParquetValueForType(&schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}}, value)
- return b, nil
- }
- case schema_pb.ScalarType_TIMESTAMP:
- switch v := value.Kind.(type) {
- case *schema_pb.Value_Int64Value:
- return parquet.Int64Value(v.Int64Value), nil
- case *schema_pb.Value_StringValue:
- if i, err := strconv.ParseInt(v.StringValue, 10, 64); err == nil {
- return parquet.Int64Value(i), nil
- }
- return parquet.Int64Value(0), nil
- default:
- return parquet.Int64Value(0), nil
- }
- case schema_pb.ScalarType_DATE:
- switch v := value.Kind.(type) {
- case *schema_pb.Value_Int32Value:
- return parquet.Int32Value(v.Int32Value), nil
- case *schema_pb.Value_Int64Value:
- return parquet.Int32Value(int32(v.Int64Value)), nil
- case *schema_pb.Value_StringValue:
- if i, err := strconv.ParseInt(v.StringValue, 10, 32); err == nil {
- return parquet.Int32Value(int32(i)), nil
- }
- return parquet.Int32Value(0), nil
- default:
- return parquet.Int32Value(0), nil
- }
- case schema_pb.ScalarType_DECIMAL:
- // Reuse existing conversion path (FixedLenByteArray 16)
- return toParquetValue(value)
- case schema_pb.ScalarType_TIME:
- switch v := value.Kind.(type) {
- case *schema_pb.Value_Int64Value:
- return parquet.Int64Value(v.Int64Value), nil
- case *schema_pb.Value_StringValue:
- if i, err := strconv.ParseInt(v.StringValue, 10, 64); err == nil {
- return parquet.Int64Value(i), nil
- }
- return parquet.Int64Value(0), nil
- default:
- return parquet.Int64Value(0), nil
- }
- }
- }
- // Fallback to generic conversion
- return toParquetValue(value)
- }
|