diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java index 80fda29aa4756..5187660c8caec 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java @@ -564,7 +564,7 @@ private static Type convertToParquetType( int scale = ((DecimalType) type).getScale(); int numBytes = computeMinBytesForDecimalPrecision(precision); return Types.primitive( - PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) + PrimitiveType.PrimitiveTypeName.BINARY, repetition) .precision(precision) .scale(scale) .length(numBytes) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java index 2bf55b35d4b09..2749f02f36d3b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetDecimalVector.java @@ -19,12 +19,9 @@ package org.apache.hudi.table.format.cow; import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.DecimalDataUtils; import org.apache.flink.table.data.vector.BytesColumnVector; import org.apache.flink.table.data.vector.ColumnVector; import org.apache.flink.table.data.vector.DecimalColumnVector; -import org.apache.flink.table.data.vector.IntColumnVector; -import org.apache.flink.table.data.vector.LongColumnVector; /** * Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to @@ -43,22 +40,10 @@ public class ParquetDecimalVector implements DecimalColumnVector { @Override public DecimalData getDecimal(int i, int precision, int scale) { - if (DecimalDataUtils.is32BitDecimal(precision)) { - return DecimalData.fromUnscaledLong( - ((IntColumnVector) vector).getInt(i), - precision, - scale); - } else if (DecimalDataUtils.is64BitDecimal(precision)) { - return DecimalData.fromUnscaledLong( - ((LongColumnVector) vector).getLong(i), - precision, - scale); - } else { - return DecimalData.fromUnscaledBytes( - ((BytesColumnVector) vector).getBytes(i).getBytes(), - precision, - scale); - } + return DecimalData.fromUnscaledBytes( + ((BytesColumnVector) vector).getBytes(i).getBytes(), + precision, + scale); } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index 778598fa67166..63b679d44f29f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -31,7 +31,6 @@ import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader; import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader; import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.DecimalDataUtils; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.vector.ColumnVector; import org.apache.flink.table.data.vector.VectorizedColumnBatch; @@ -46,7 +45,6 @@ import org.apache.flink.table.data.vector.heap.HeapTimestampVector; import org.apache.flink.table.data.vector.writable.WritableColumnVector; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; @@ -197,23 +195,10 @@ private static ColumnVector createVectorFromConstant( DecimalData decimal = value == null ? null : Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, precision, scale)); - ColumnVector internalVector; - if (DecimalDataUtils.is32BitDecimal(precision)) { - internalVector = createVectorFromConstant( - new IntType(), - decimal == null ? null : (int) decimal.toUnscaledLong(), - batchSize); - } else if (DecimalDataUtils.is64BitDecimal(precision)) { - internalVector = createVectorFromConstant( - new BigIntType(), - decimal == null ? null : decimal.toUnscaledLong(), - batchSize); - } else { - internalVector = createVectorFromConstant( - new VarBinaryType(), - decimal == null ? null : decimal.toUnscaledBytes(), - batchSize); - } + ColumnVector internalVector = createVectorFromConstant( + new VarBinaryType(), + decimal == null ? null : decimal.toUnscaledBytes(), + batchSize); return new ParquetDecimalVector(internalVector); case FLOAT: HeapFloatVector fv = new HeapFloatVector(batchSize); @@ -365,29 +350,10 @@ public static WritableColumnVector createWritableColumnVector( "TIME_MICROS original type is not "); return new HeapTimestampVector(batchSize); case DECIMAL: - DecimalType decimalType = (DecimalType) fieldType; - if (DecimalDataUtils.is32BitDecimal(decimalType.getPrecision())) { - checkArgument( - (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY - || typeName == PrimitiveType.PrimitiveTypeName.INT32) - && primitiveType.getOriginalType() == OriginalType.DECIMAL, - "Unexpected type: %s", typeName); - return new HeapIntVector(batchSize); - } else if (DecimalDataUtils.is64BitDecimal(decimalType.getPrecision())) { - checkArgument( - (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY - || typeName == PrimitiveType.PrimitiveTypeName.INT64) - && primitiveType.getOriginalType() == OriginalType.DECIMAL, - "Unexpected type: %s", typeName); - return new HeapLongVector(batchSize); - } else { - checkArgument( - (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY - || typeName == PrimitiveType.PrimitiveTypeName.BINARY) - && primitiveType.getOriginalType() == OriginalType.DECIMAL, - "Unexpected type: %s", typeName); - return new HeapBytesVector(batchSize); - } + checkArgument(typeName == PrimitiveType.PrimitiveTypeName.BINARY + && primitiveType.getOriginalType() == OriginalType.DECIMAL, + "Unexpected type: %s", typeName); + return new HeapBytesVector(batchSize); default: throw new UnsupportedOperationException(fieldType + " is not supported now."); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 5be603f7838e5..9d0bcabac6aaa 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -349,7 +349,7 @@ void testStreamReadMorTableWithCompactionPlan() throws Exception { .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false) // generate compaction plan for each commit .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) - .withPartition(false) + .noPartition() .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -399,7 +399,7 @@ void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.TABLE_NAME, tableType.name()) - .withPartition(false) + .noPartition() .end(); tableEnv.executeSql(hoodieTableDDL); @@ -563,7 +563,7 @@ void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType tableType) String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.TABLE_TYPE, tableType) - .withPartition(false) + .noPartition() .end(); tableEnv.executeSql(hoodieTableDDL); @@ -770,7 +770,7 @@ void testBulkInsertNonPartitionedTable() { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.OPERATION, "bulk_insert") - .withPartition(false) + .noPartition() .end(); tableEnv.executeSql(hoodieTableDDL); @@ -854,6 +854,31 @@ void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @Test + void testWriteReadDecimals() { + TableEnvironment tableEnv = batchTableEnv; + String createTable = sql("decimals") + .field("f0 decimal(3, 2)") + .field("f1 decimal(10, 2)") + .field("f2 decimal(20, 2)") + .field("f3 decimal(38, 18)") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.OPERATION, "bulk_insert") + .option(FlinkOptions.PRECOMBINE_FIELD, "f1") + .pkField("f0") + .noPartition() + .end(); + tableEnv.executeSql(createTable); + + String insertInto = "insert into decimals values\n" + + "(1.23, 12345678.12, 12345.12, 123456789.12345)"; + execInsertSql(tableEnv, insertInto); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from decimals").execute().collect()); + assertRowsEquals(result1, "[+I[1.23, 12345678.12, 12345.12, 123456789.123450000000000000]]"); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index b66d55a77c7fa..f9824426aa736 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -32,9 +32,12 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * Configurations for the test. @@ -57,6 +60,9 @@ private TestConfigurations() { .fields(ROW_TYPE.getFieldNames(), ROW_DATA_TYPE.getChildren()) .build(); + private static final List FIELDS = ROW_TYPE.getFields().stream() + .map(RowType.RowField::asSummaryString).collect(Collectors.toList()); + public static String getCreateHoodieTableDDL(String tableName, Map options) { return getCreateHoodieTableDDL(tableName, options, true, "partition"); } @@ -66,15 +72,23 @@ public static String getCreateHoodieTableDDL( Map options, boolean havePartition, String partitionField) { + return getCreateHoodieTableDDL(tableName, FIELDS, options, havePartition, "uuid", partitionField); + } + + public static String getCreateHoodieTableDDL( + String tableName, + List fields, + Map options, + boolean havePartition, + String pkField, + String partitionField) { StringBuilder builder = new StringBuilder(); - builder.append("create table " + tableName + "(\n" - + " uuid varchar(20),\n" - + " name varchar(10),\n" - + " age int,\n" - + " ts timestamp(3),\n" - + " `partition` varchar(20),\n" - + " PRIMARY KEY(uuid) NOT ENFORCED\n" - + ")\n"); + builder.append("create table ").append(tableName).append("(\n"); + for (String field : fields) { + builder.append(" ").append(field).append(",\n"); + } + builder.append(" PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n") + .append(")\n"); if (havePartition) { builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n"); } @@ -205,8 +219,10 @@ public static Sql sql(String tableName) { */ public static class Sql { private final Map options; - private String tableName; + private final String tableName; + private List fields = new ArrayList<>(); private boolean withPartition = true; + private String pkField = "uuid"; private String partitionField = "partition"; public Sql(String tableName) { @@ -219,8 +235,13 @@ public Sql option(ConfigOption option, Object val) { return this; } - public Sql withPartition(boolean withPartition) { - this.withPartition = withPartition; + public Sql noPartition() { + this.withPartition = false; + return this; + } + + public Sql pkField(String pkField) { + this.pkField = pkField; return this; } @@ -229,8 +250,17 @@ public Sql partitionField(String partitionField) { return this; } + public Sql field(String fieldSchema) { + fields.add(fieldSchema); + return this; + } + public String end() { - return TestConfigurations.getCreateHoodieTableDDL(this.tableName, options, this.withPartition, this.partitionField); + if (this.fields.size() == 0) { + this.fields = FIELDS; + } + return TestConfigurations.getCreateHoodieTableDDL(this.tableName, this.fields, options, + this.withPartition, this.pkField, this.partitionField); } } }