Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
Expand Down Expand Up @@ -287,7 +287,7 @@ private FlinkOptions() {
public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
.key("write.payload.class")
.stringType()
.defaultValue(OverwriteWithLatestAvroPayload.class.getName())
.defaultValue(EventTimeAvroPayload.class.getName())
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");

Expand Down Expand Up @@ -718,7 +718,7 @@ private FlinkOptions() {
public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
.key("hive_sync.mode")
.stringType()
.defaultValue("jdbc")
.defaultValue("hms")
.withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'");

public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
Expand Down Expand Up @@ -754,7 +754,7 @@ private FlinkOptions() {
public static final ConfigOption<String> HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME = ConfigOptions
.key("hive_sync.partition_extractor_class")
.stringType()
.defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName())
.defaultValue(MultiPartKeysValueExtractor.class.getName())
.withDescription("Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.util.FlinkStateBackendConverter;
import org.apache.hudi.util.StreamerUtil;
Expand Down Expand Up @@ -321,8 +321,8 @@ public class FlinkStreamerConfig extends Configuration {
public String hiveSyncPartitionFields = "";

@Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'")
public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
+ "default 'MultiPartKeysValueExtractor'")
public String hiveSyncPartitionExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName();

@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
public Boolean hiveSyncAssumeDatePartition = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
package org.apache.hudi.table;

import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
Expand All @@ -38,6 +36,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
Expand Down Expand Up @@ -71,7 +70,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);

Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should not be empty.")));
Expand All @@ -90,7 +89,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
"Option [path] should not be empty.");
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);
return new HoodieTableSink(conf, schema);
}

Expand Down Expand Up @@ -154,35 +153,30 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) {
throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema."
+ "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
}
} else if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PAYLOAD_CLASS_NAME)) {
// if precombine field is specified but payload clazz is default,
// use DefaultHoodieRecordPayload to make sure the precombine field is always taken for
// comparing.
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName());
}
}

/**
* Sets up the config options based on the table definition, for e.g the table name, primary key.
* Sets up the config options based on the table definition, for e.g, the table name, primary key.
*
* @param conf The configuration to setup
* @param tableName The table name
* @param conf The configuration to set up
* @param tablePath The table path
* @param table The catalog table
* @param schema The physical schema
*/
private static void setupConfOptions(
Configuration conf,
String tableName,
ObjectIdentifier tablePath,
CatalogTable table,
ResolvedSchema schema) {
// table name
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
// hoodie key about options
setupHoodieKeyOptions(conf, table);
// compaction options
setupCompactionOptions(conf);
// hive options
setupHiveOptions(conf);
setupHiveOptions(conf, tablePath);
// read options
setupReadOptions(conf);
// write options
Expand Down Expand Up @@ -309,10 +303,12 @@ private static void setupCompactionOptions(Configuration conf) {
/**
* Sets up the hive options from the table definition.
*/
private static void setupHiveOptions(Configuration conf) {
if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
&& FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) {
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, MultiPartKeysValueExtractor.class.getName());
private static void setupHiveOptions(Configuration conf, ObjectIdentifier tablePath) {
if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) {
conf.setString(FlinkOptions.HIVE_SYNC_DB, tablePath.getDatabaseName());
}
if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) {
conf.setString(FlinkOptions.HIVE_SYNC_TABLE, tablePath.getObjectName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
Expand Down Expand Up @@ -240,15 +239,21 @@ void testSetupHiveOptionsForSource() {
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1);
final Configuration conf1 = tableSource1.getConf();
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));

// set up hive style partitioning is true.
this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);

final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2);
final Configuration conf2 = tableSource2.getConf();
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));
}

@Test
Expand Down Expand Up @@ -430,15 +435,21 @@ void testSetupHiveOptionsForSink() {
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1);
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));

// set up hive style partitioning is true.
this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);

final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2);
final Configuration conf2 = tableSink2.getConf();
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(SlashEncodedDayPartitionValueExtractor.class.getName()));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName()));
}

@Test
Expand Down Expand Up @@ -542,7 +553,7 @@ static MockContext getInstance(Configuration conf, ResolvedSchema schema, List<S

@Override
public ObjectIdentifier getObjectIdentifier() {
return ObjectIdentifier.of("hudi", "default", "t1");
return ObjectIdentifier.of("hudi", "db1", "t1");
}

@Override
Expand Down