Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ private static Type convertToParquetType(
int scale = ((DecimalType) type).getScale();
int numBytes = computeMinBytesForDecimalPrecision(precision);
return Types.primitive(
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.precision(precision)
.scale(scale)
.length(numBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
package org.apache.hudi.table.format.cow;

import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.DecimalDataUtils;
import org.apache.flink.table.data.vector.BytesColumnVector;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.DecimalColumnVector;
import org.apache.flink.table.data.vector.IntColumnVector;
import org.apache.flink.table.data.vector.LongColumnVector;

/**
* Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to
Expand All @@ -43,22 +40,10 @@ public class ParquetDecimalVector implements DecimalColumnVector {

@Override
public DecimalData getDecimal(int i, int precision, int scale) {
if (DecimalDataUtils.is32BitDecimal(precision)) {
return DecimalData.fromUnscaledLong(
((IntColumnVector) vector).getInt(i),
precision,
scale);
} else if (DecimalDataUtils.is64BitDecimal(precision)) {
return DecimalData.fromUnscaledLong(
((LongColumnVector) vector).getLong(i),
precision,
scale);
} else {
return DecimalData.fromUnscaledBytes(
((BytesColumnVector) vector).getBytes(i).getBytes(),
precision,
scale);
}
return DecimalData.fromUnscaledBytes(
((BytesColumnVector) vector).getBytes(i).getBytes(),
precision,
scale);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader;
import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.DecimalDataUtils;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.vector.ColumnVector;
import org.apache.flink.table.data.vector.VectorizedColumnBatch;
Expand All @@ -46,7 +45,6 @@
import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
Expand Down Expand Up @@ -197,23 +195,10 @@ private static ColumnVector createVectorFromConstant(
DecimalData decimal = value == null
? null
: Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, precision, scale));
ColumnVector internalVector;
if (DecimalDataUtils.is32BitDecimal(precision)) {
internalVector = createVectorFromConstant(
new IntType(),
decimal == null ? null : (int) decimal.toUnscaledLong(),
batchSize);
} else if (DecimalDataUtils.is64BitDecimal(precision)) {
internalVector = createVectorFromConstant(
new BigIntType(),
decimal == null ? null : decimal.toUnscaledLong(),
batchSize);
} else {
internalVector = createVectorFromConstant(
new VarBinaryType(),
decimal == null ? null : decimal.toUnscaledBytes(),
batchSize);
}
ColumnVector internalVector = createVectorFromConstant(
new VarBinaryType(),
decimal == null ? null : decimal.toUnscaledBytes(),
batchSize);
return new ParquetDecimalVector(internalVector);
case FLOAT:
HeapFloatVector fv = new HeapFloatVector(batchSize);
Expand Down Expand Up @@ -365,29 +350,10 @@ public static WritableColumnVector createWritableColumnVector(
"TIME_MICROS original type is not ");
return new HeapTimestampVector(batchSize);
case DECIMAL:
DecimalType decimalType = (DecimalType) fieldType;
if (DecimalDataUtils.is32BitDecimal(decimalType.getPrecision())) {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.INT32)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapIntVector(batchSize);
} else if (DecimalDataUtils.is64BitDecimal(decimalType.getPrecision())) {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.INT64)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapLongVector(batchSize);
} else {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapBytesVector(batchSize);
}
checkArgument(typeName == PrimitiveType.PrimitiveTypeName.BINARY
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
return new HeapBytesVector(batchSize);
default:
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ void testStreamReadMorTableWithCompactionPlan() throws Exception {
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
// generate compaction plan for each commit
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.withPartition(false)
.noPartition()
.end();
streamTableEnv.executeSql(hoodieTableDDL);

Expand Down Expand Up @@ -399,7 +399,7 @@ void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_NAME, tableType.name())
.withPartition(false)
.noPartition()
.end();
tableEnv.executeSql(hoodieTableDDL);

Expand Down Expand Up @@ -563,7 +563,7 @@ void testWriteNonPartitionedTable(ExecMode execMode, HoodieTableType tableType)
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
.withPartition(false)
.noPartition()
.end();
tableEnv.executeSql(hoodieTableDDL);

Expand Down Expand Up @@ -770,7 +770,7 @@ void testBulkInsertNonPartitionedTable() {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "bulk_insert")
.withPartition(false)
.noPartition()
.end();
tableEnv.executeSql(hoodieTableDDL);

Expand Down Expand Up @@ -854,6 +854,31 @@ void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}

@Test
void testWriteReadDecimals() {
TableEnvironment tableEnv = batchTableEnv;
String createTable = sql("decimals")
.field("f0 decimal(3, 2)")
.field("f1 decimal(10, 2)")
.field("f2 decimal(20, 2)")
.field("f3 decimal(38, 18)")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "bulk_insert")
.option(FlinkOptions.PRECOMBINE_FIELD, "f1")
.pkField("f0")
.noPartition()
.end();
tableEnv.executeSql(createTable);

String insertInto = "insert into decimals values\n"
+ "(1.23, 12345678.12, 12345.12, 123456789.12345)";
execInsertSql(tableEnv, insertInto);

List<Row> result1 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from decimals").execute().collect());
assertRowsEquals(result1, "[+I[1.23, 12345678.12, 12345.12, 123456789.123450000000000000]]");
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Configurations for the test.
Expand All @@ -57,6 +60,9 @@ private TestConfigurations() {
.fields(ROW_TYPE.getFieldNames(), ROW_DATA_TYPE.getChildren())
.build();

private static final List<String> FIELDS = ROW_TYPE.getFields().stream()
.map(RowType.RowField::asSummaryString).collect(Collectors.toList());

public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
return getCreateHoodieTableDDL(tableName, options, true, "partition");
}
Expand All @@ -66,15 +72,23 @@ public static String getCreateHoodieTableDDL(
Map<String, String> options,
boolean havePartition,
String partitionField) {
return getCreateHoodieTableDDL(tableName, FIELDS, options, havePartition, "uuid", partitionField);
}

public static String getCreateHoodieTableDDL(
String tableName,
List<String> fields,
Map<String, String> options,
boolean havePartition,
String pkField,
String partitionField) {
StringBuilder builder = new StringBuilder();
builder.append("create table " + tableName + "(\n"
+ " uuid varchar(20),\n"
+ " name varchar(10),\n"
+ " age int,\n"
+ " ts timestamp(3),\n"
+ " `partition` varchar(20),\n"
+ " PRIMARY KEY(uuid) NOT ENFORCED\n"
+ ")\n");
builder.append("create table ").append(tableName).append("(\n");
for (String field : fields) {
builder.append(" ").append(field).append(",\n");
}
builder.append(" PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n")
.append(")\n");
if (havePartition) {
builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n");
}
Expand Down Expand Up @@ -205,8 +219,10 @@ public static Sql sql(String tableName) {
*/
public static class Sql {
private final Map<String, String> options;
private String tableName;
private final String tableName;
private List<String> fields = new ArrayList<>();
private boolean withPartition = true;
private String pkField = "uuid";
private String partitionField = "partition";

public Sql(String tableName) {
Expand All @@ -219,8 +235,13 @@ public Sql option(ConfigOption<?> option, Object val) {
return this;
}

public Sql withPartition(boolean withPartition) {
this.withPartition = withPartition;
public Sql noPartition() {
this.withPartition = false;
return this;
}

public Sql pkField(String pkField) {
this.pkField = pkField;
return this;
}

Expand All @@ -229,8 +250,17 @@ public Sql partitionField(String partitionField) {
return this;
}

public Sql field(String fieldSchema) {
fields.add(fieldSchema);
return this;
}

public String end() {
return TestConfigurations.getCreateHoodieTableDDL(this.tableName, options, this.withPartition, this.partitionField);
if (this.fields.size() == 0) {
this.fields = FIELDS;
}
return TestConfigurations.getCreateHoodieTableDDL(this.tableName, this.fields, options,
this.withPartition, this.pkField, this.partitionField);
}
}
}