From 402da83c7d976184e1692c78592805a3c39b0476 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Sat, 13 Nov 2021 19:49:57 +0800 Subject: [PATCH] [HUDI-2756] Fix flink parquet writer decimal type conversion --- .../row/parquet/ParquetSchemaConverter.java | 2 +- .../format/cow/FixedLenBytesColumnReader.java | 85 +++++++++++++++++++ .../format/cow/ParquetSplitReaderUtil.java | 3 +- 3 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java index 5187660c8caec..80fda29aa4756 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java @@ -564,7 +564,7 @@ private static Type convertToParquetType( int scale = ((DecimalType) type).getScale(); int numBytes = computeMinBytesForDecimalPrecision(precision); return Types.primitive( - PrimitiveType.PrimitiveTypeName.BINARY, repetition) + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) .precision(precision) .scale(scale) .length(numBytes) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java new file mode 100644 index 0000000000000..07a93e19c25c6 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format.cow; + +import org.apache.flink.table.data.vector.writable.WritableBytesVector; +import org.apache.flink.table.data.vector.writable.WritableColumnVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; + +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Fixed length bytes {@code ColumnReader}, just for decimal. + * + *

Note: Reference Flink release 1.13.2 + * {@code org.apache.flink.formats.parquet.vector.reader.FixedLenBytesColumnReader} + * to always write as legacy decimal format. + */ +public class FixedLenBytesColumnReader + extends AbstractColumnReader { + + public FixedLenBytesColumnReader( + ColumnDescriptor descriptor, PageReader pageReader, int precision) throws IOException { + super(descriptor, pageReader); + checkTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + } + + @Override + protected void readBatch(int rowId, int num, V column) { + int bytesLen = descriptor.getPrimitiveType().getTypeLength(); + WritableBytesVector bytesVector = (WritableBytesVector) column; + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + byte[] bytes = readDataBinary(bytesLen).getBytes(); + bytesVector.appendBytes(rowId + i, bytes, 0, bytes.length); + } else { + bytesVector.setNullAt(rowId + i); + } + } + } + + @Override + protected void readBatchFromDictionaryIds( + int rowId, int num, V column, WritableIntVector dictionaryIds) { + WritableBytesVector bytesVector = (WritableBytesVector) column; + for (int i = rowId; i < rowId + num; ++i) { + if (!bytesVector.isNullAt(i)) { + byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytes(); + bytesVector.appendBytes(i, v, 0, v.length); + } + } + } + + private Binary readDataBinary(int len) { + ByteBuffer buffer = readDataBuffer(len); + if (buffer.hasArray()) { + return Binary.fromConstantByteArray( + buffer.array(), buffer.arrayOffset() + buffer.position(), len); + } else { + byte[] bytes = new byte[len]; + buffer.get(bytes); + return Binary.fromConstantByteArray(bytes); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index 6bb514b429b38..d87a2af706d84 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -24,7 +24,6 @@ import org.apache.flink.formats.parquet.vector.reader.BytesColumnReader; import org.apache.flink.formats.parquet.vector.reader.ColumnReader; import org.apache.flink.formats.parquet.vector.reader.DoubleColumnReader; -import org.apache.flink.formats.parquet.vector.reader.FixedLenBytesColumnReader; import org.apache.flink.formats.parquet.vector.reader.FloatColumnReader; import org.apache.flink.formats.parquet.vector.reader.IntColumnReader; import org.apache.flink.formats.parquet.vector.reader.LongColumnReader; @@ -366,7 +365,7 @@ public static WritableColumnVector createWritableColumnVector( "TIME_MICROS original type is not "); return new HeapTimestampVector(batchSize); case DECIMAL: - checkArgument(typeName == PrimitiveType.PrimitiveTypeName.BINARY + checkArgument(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && primitiveType.getOriginalType() == OriginalType.DECIMAL, "Unexpected type: %s", typeName); return new HeapBytesVector(batchSize);