Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.StreamerUtil;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
Expand Down Expand Up @@ -397,17 +399,22 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
String path = hiveTable.getSd().getLocation();
Map<String, String> parameters = hiveTable.getParameters();
Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
String pkColumnsStr = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
List<String> pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr)
? null : StringUtils.split(pkColumnsStr, ",");
org.apache.flink.table.api.Schema schema;
if (latestTableSchema != null) {
// if the table is initialized from spark, the write schema is nullable for pk columns.
DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns);
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
.fromRowDataType(tableDataType);
String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
// pkColumns expect not to be null
builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns, ","));
builder.primaryKeyNamed(pkConstraintName, pkColumns);
} else if (pkColumns != null) {
builder.primaryKey(StringUtils.split(pkColumns, ","));
builder.primaryKey(pkColumns);
}
schema = builder.build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.util;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
Expand All @@ -26,10 +27,14 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Utilities for {@link org.apache.flink.table.types.DataType}.
Expand Down Expand Up @@ -123,4 +128,39 @@ public static Object resolvePartition(String partition, DataType type) {
"Can not convert %s to type %s for partition value", partition, type));
}
}

/**
* Ensures the give columns of the row data type are not nullable(for example, the primary keys).
*
* @param dataType The row data type
* @param pkColumns The primary keys
* @return a new row data type if any column nullability is tweaked or the original data type
*/
public static DataType ensureColumnsAsNonNullable(DataType dataType, @Nullable List<String> pkColumns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this method should be generic no reason to couple it to primary-keys (we can call it for non-PK use-cases as well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, thank you for your reviews. This method is used to convert the primary key field into not null type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@waywtdcc i understand that. What i'm saying though is that there's no reason for us to couple it to primary keys (it should be generic to be able to handle any columns)

if (pkColumns == null || pkColumns.isEmpty()) {
return dataType;
}
RowType rowType = (RowType) dataType.getLogicalType();
List<DataType> oriFieldTypes = dataType.getChildren();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: originalFieldTypes

List<String> fieldNames = rowType.getFieldNames();
List<DataType> fieldTypes = new ArrayList<>();
boolean tweaked = false;
for (int i = 0; i < fieldNames.size(); i++) {
if (pkColumns.contains(fieldNames.get(i)) && rowType.getTypeAt(i).isNullable()) {
fieldTypes.add(oriFieldTypes.get(i).notNull());
tweaked = true;
} else {
fieldTypes.add(oriFieldTypes.get(i));
}
}
if (!tweaked) {
return dataType;
}
List<DataTypes.Field> fields = new ArrayList<>();
for (int i = 0; i < fieldNames.size(); i++) {
fields.add(DataTypes.FIELD(fieldNames.get(i), fieldTypes.get(i)));
}
return DataTypes.ROW(fields.toArray(new DataTypes.Field[fields.size()])).notNull();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to do: fields.toArray(DataTypes:Field[]::new)

}

}