diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 5187660c8caec..80fda29aa4756 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -564,7 +564,7 @@ private static Type convertToParquetType(
int scale = ((DecimalType) type).getScale();
int numBytes = computeMinBytesForDecimalPrecision(precision);
return Types.primitive(
- PrimitiveType.PrimitiveTypeName.BINARY, repetition)
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
.precision(precision)
.scale(scale)
.length(numBytes)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java
new file mode 100644
index 0000000000000..07a93e19c25c6
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/FixedLenBytesColumnReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow;
+
+import org.apache.flink.table.data.vector.writable.WritableBytesVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Fixed length bytes {@code ColumnReader}, just for decimal.
+ *
+ *
Note: Reference Flink release 1.13.2
+ * {@code org.apache.flink.formats.parquet.vector.reader.FixedLenBytesColumnReader}
+ * to always write as legacy decimal format.
+ */
+public class FixedLenBytesColumnReader
+ extends AbstractColumnReader {
+
+ public FixedLenBytesColumnReader(
+ ColumnDescriptor descriptor, PageReader pageReader, int precision) throws IOException {
+ super(descriptor, pageReader);
+ checkTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);
+ }
+
+ @Override
+ protected void readBatch(int rowId, int num, V column) {
+ int bytesLen = descriptor.getPrimitiveType().getTypeLength();
+ WritableBytesVector bytesVector = (WritableBytesVector) column;
+ for (int i = 0; i < num; i++) {
+ if (runLenDecoder.readInteger() == maxDefLevel) {
+ byte[] bytes = readDataBinary(bytesLen).getBytes();
+ bytesVector.appendBytes(rowId + i, bytes, 0, bytes.length);
+ } else {
+ bytesVector.setNullAt(rowId + i);
+ }
+ }
+ }
+
+ @Override
+ protected void readBatchFromDictionaryIds(
+ int rowId, int num, V column, WritableIntVector dictionaryIds) {
+ WritableBytesVector bytesVector = (WritableBytesVector) column;
+ for (int i = rowId; i < rowId + num; ++i) {
+ if (!bytesVector.isNullAt(i)) {
+ byte[] v = dictionary.decodeToBinary(dictionaryIds.getInt(i)).getBytes();
+ bytesVector.appendBytes(i, v, 0, v.length);
+ }
+ }
+ }
+
+ private Binary readDataBinary(int len) {
+ ByteBuffer buffer = readDataBuffer(len);
+ if (buffer.hasArray()) {
+ return Binary.fromConstantByteArray(
+ buffer.array(), buffer.arrayOffset() + buffer.position(), len);
+ } else {
+ byte[] bytes = new byte[len];
+ buffer.get(bytes);
+ return Binary.fromConstantByteArray(bytes);
+ }
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 6bb514b429b38..d87a2af706d84 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -24,7 +24,6 @@
import org.apache.flink.formats.parquet.vector.reader.BytesColumnReader;
import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
import org.apache.flink.formats.parquet.vector.reader.DoubleColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.FixedLenBytesColumnReader;
import org.apache.flink.formats.parquet.vector.reader.FloatColumnReader;
import org.apache.flink.formats.parquet.vector.reader.IntColumnReader;
import org.apache.flink.formats.parquet.vector.reader.LongColumnReader;
@@ -366,7 +365,7 @@ public static WritableColumnVector createWritableColumnVector(
"TIME_MICROS original type is not ");
return new HeapTimestampVector(batchSize);
case DECIMAL:
- checkArgument(typeName == PrimitiveType.PrimitiveTypeName.BINARY
+ checkArgument(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapBytesVector(batchSize);