From 3e2aa14bb6a278bc83707b1063c9f57f1dc4dcae Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 15 Sep 2022 16:46:10 +0530 Subject: [PATCH 1/3] [HUDI-4071] Remove default value for mandatory record key field Fix spark sql writer test --- .../apache/hudi/config/HoodieIndexConfig.java | 4 ++-- .../hudi/config/TestHoodieWriteConfig.java | 15 +++++++++++---- ...ConsistentBucketClusteringPlanStrategy.java | 10 ++++++++-- .../hudi/index/TestHoodieIndexConfigs.java | 8 ++++++-- .../bucket/TestHoodieSimpleBucketIndex.java | 1 + .../commit/TestCopyOnWriteActionExecutor.java | 3 +-- ...tHoodieSparkMergeOnReadTableCompaction.java | 10 ++++------ .../hudi/common/config/ConfigProperty.java | 4 ++-- .../keygen/constant/KeyGeneratorOptions.java | 2 +- .../quickstart/HoodieSparkQuickstart.java | 2 +- .../quickstart/TestHoodieSparkQuickstart.java | 10 ---------- .../org/apache/hudi/DataSourceOptions.scala | 5 +---- .../org/apache/hudi/HoodieWriterUtils.scala | 1 - .../catalyst/catalog/HoodieCatalogTable.scala | 2 +- .../spark/sql/hudi/HoodieOptionConfig.scala | 1 - .../apache/hudi/TestDataSourceOptions.scala | 2 ++ .../apache/hudi/TestHoodieSparkSqlWriter.scala | 18 +++++++++++++----- ... => TestMORDataSourceWithBucketIndex.scala} | 16 +++++++--------- 18 files changed, 61 insertions(+), 53 deletions(-) rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/{TestMORDataSourceWithBucket.scala => TestMORDataSourceWithBucketIndex.scala} (99%) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 839131cdb864a..3c8bc636ed20d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -677,10 +677,10 @@ private void validateBucketIndexConfig() { // check the bucket index hash field if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD))) { hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, - hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME)); + hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME)); } else { boolean valid = Arrays - .stream(hoodieIndexConfig.getStringOrDefault(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(",")) + .stream(hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(",")) .collect(Collectors.toSet()) .containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(","))); if (!valid) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 9d442f8df5432..e7afa50a59d88 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.config.HoodieWriteConfig.Builder; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -374,8 +375,10 @@ public void testAutoConcurrencyConfigAdjustmentWithMetadataTableDisabled(HoodieT @Test public void testConsistentBucketIndexDefaultClusteringConfig() { + Properties props = new Properties(); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET) .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build()) .build(); assertEquals(HoodieClusteringConfig.SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY, writeConfig.getClusteringPlanStrategyClass()); @@ -384,7 +387,9 @@ public void testConsistentBucketIndexDefaultClusteringConfig() { @Test public void testConsistentBucketIndexInvalidClusteringConfig() { - TypedProperties consistentBucketIndexProps = HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET) + Properties props = new Properties(); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); + TypedProperties consistentBucketIndexProps = HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET) .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build().getProps(); HoodieWriteConfig.Builder writeConfigBuilder = HoodieWriteConfig.newBuilder().withPath("/tmp"); @@ -400,14 +405,16 @@ public void testConsistentBucketIndexInvalidClusteringConfig() { @Test public void testSimpleBucketIndexPartitionerConfig() { + Properties props = new Properties(); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET) .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE).build()) .build(); assertEquals(HoodieLayoutConfig.SIMPLE_BUCKET_LAYOUT_PARTITIONER_CLASS_NAME, writeConfig.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME)); HoodieWriteConfig overwritePartitioner = HoodieWriteConfig.newBuilder().withPath("/tmp") - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET) .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE) .build()) .withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutPartitioner("org.apache.hudi.table.action.commit.UpsertPartitioner").build()) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java index 810dbc5bc75cc..eec79756bd767 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java @@ -33,6 +33,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.bucket.ConsistentBucketIdentifier; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; @@ -43,6 +44,7 @@ import java.io.IOException; import java.util.List; +import java.util.Properties; import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -70,8 +72,10 @@ public void tearDown() throws IOException { public void testBuildSplitClusteringGroup() throws IOException { setup(); int maxFileSize = 5120; + Properties props = new Properties(); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET) .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING) .withBucketMaxNum(6) .withBucketNum("4").build()) @@ -110,8 +114,10 @@ public void testBuildSplitClusteringGroup() throws IOException { public void testBuildMergeClusteringGroup() throws Exception { setup(); int maxFileSize = 5120; + Properties props = new Properties(); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET) .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING) .withBucketMinNum(4) .withBucketNum("4").build()) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java index 4f32f0ec10617..632fabebb7f97 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java @@ -31,6 +31,7 @@ import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; import org.apache.hudi.index.simple.HoodieSimpleIndex; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,6 +40,7 @@ import org.junit.jupiter.params.provider.EnumSource; import java.nio.file.Path; +import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -88,13 +90,15 @@ public void testCreateIndex(IndexType indexType) { assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof SparkHoodieHBaseIndex); break; case BUCKET: + Properties props = new Properties(); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); config = clientConfigBuilder.withPath(basePath) - .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET) + .withIndexConfig(indexConfigBuilder.fromProperties(props).withIndexType(IndexType.BUCKET) .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE).build()).build(); assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSimpleBucketIndex); config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET) + .withIndexConfig(indexConfigBuilder.fromProperties(props).withIndexType(IndexType.BUCKET) .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build()) .build(); assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSparkConsistentBucketIndex); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java index 34728c6cf368b..575fdfecc4aff 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java @@ -84,6 +84,7 @@ public void testBucketIndexValidityCheck() { .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE) .withBucketNum("8").build(); }); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "uuid"); HoodieIndexConfig.newBuilder().fromProperties(props) .withIndexType(HoodieIndex.IndexType.BUCKET) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index f165b48db05d1..907520bfe219d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -145,18 +145,17 @@ private Properties makeIndexConfig(HoodieIndex.IndexType indexType) { Properties props = new Properties(); HoodieIndexConfig.Builder indexConfig = HoodieIndexConfig.newBuilder() .withIndexType(indexType); - props.putAll(indexConfig.build().getProps()); if (indexType.equals(HoodieIndex.IndexType.BUCKET)) { props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); indexConfig.fromProperties(props) .withIndexKeyField("_row_key") .withBucketNum("1") .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE); - props.putAll(indexConfig.build().getProps()); props.putAll(HoodieLayoutConfig.newBuilder().fromProperties(props) .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build().getProps()); } + props.putAll(indexConfig.build().getProps()); return props; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index 6fa4facb56881..ffdd9cb67b6a4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -93,6 +93,7 @@ public void setup() { @ParameterizedTest @MethodSource("writePayloadTest") public void testWriteDuringCompaction(String payloadClass) throws IOException { + Properties props = getPropertiesForKeyGen(true); HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .forTable("test-trip-table") .withPath(basePath()) @@ -107,10 +108,8 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException { .withLayoutConfig(HoodieLayoutConfig.newBuilder() .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) .build(); - - Properties props = getPropertiesForKeyGen(true); props.putAll(config.getProps()); metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); @@ -139,6 +138,7 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException { @ParameterizedTest @MethodSource("writeLogTest") public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException { + Properties props = getPropertiesForKeyGen(true); HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .forTable("test-trip-table") .withPath(basePath()) @@ -152,10 +152,8 @@ public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean en .withLayoutConfig(HoodieLayoutConfig.newBuilder() .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) .build(); - - Properties props = getPropertiesForKeyGen(true); props.putAll(config.getProps()); metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java index 934803d8d315e..08f36512c9150 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java @@ -27,9 +27,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.function.Function; -import java.util.Objects; /** * ConfigProperty describes a configuration property. It contains the configuration @@ -76,7 +76,7 @@ public String key() { public T defaultValue() { if (defaultValue == null) { - throw new HoodieException("There's no default value for this config"); + throw new HoodieException(String.format("There's no default value for this config: %s", key)); } return defaultValue; } diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java index ff182c4c1661f..a2be94a453e10 100644 --- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java +++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -45,7 +45,7 @@ public class KeyGeneratorOptions extends HoodieConfig { public static final ConfigProperty RECORDKEY_FIELD_NAME = ConfigProperty .key("hoodie.datasource.write.recordkey.field") - .defaultValue("uuid") + .noDefaultValue() .withDocumentation("Record key field. Value to be used as the `recordKey` component of `HoodieKey`.\n" + "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using\n" + "the dot notation eg: `a.b.c`"); diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java index 9c6293fe4471e..325aad437ade7 100644 --- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/quickstart/HoodieSparkQuickstart.java @@ -210,7 +210,7 @@ public static Dataset delete(SparkSession spark, String tablePath, String t df.write().format("org.apache.hudi") .options(QuickstartUtils.getQuickstartWriteConfigs()) .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") - .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "uuid") + .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid") .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath") .option(TBL_NAME.key(), tableName) .option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value()) diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java index c23db7f8e7106..823d8ccd13685 100644 --- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -21,8 +21,6 @@ import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieAvroPayload; -import org.apache.hudi.examples.common.HoodieExampleDataGenerator; import org.apache.hudi.testutils.providers.SparkProvider; import org.apache.spark.SparkConf; @@ -37,15 +35,7 @@ import java.io.File; import java.nio.file.Paths; -import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.delete; -import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.deleteByPartition; -import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.incrementalQuery; -import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertData; -import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertOverwriteData; -import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.pointInTimeQuery; -import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.queryData; import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.runQuickstart; -import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.updateData; public class TestHoodieSparkQuickstart implements SparkProvider { protected static HoodieSparkEngineContext context; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 57cab09377fa1..6d66b4189a3ac 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -612,9 +612,6 @@ object DataSourceWriteOptions { val RECORDKEY_FIELD_OPT_KEY = KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() /** @deprecated Use {@link RECORDKEY_FIELD} and its methods instead */ @Deprecated - val DEFAULT_RECORDKEY_FIELD_OPT_VAL = RECORDKEY_FIELD.defaultValue() - /** @deprecated Use {@link PARTITIONPATH_FIELD} and its methods instead */ - @Deprecated val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() /** @deprecated Use {@link PARTITIONPATH_FIELD} and its methods instead */ @Deprecated @@ -794,7 +791,7 @@ object DataSourceOptionsHelper { def inferKeyGenClazz(props: TypedProperties): String = { val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null) - val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue()) + val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), null) inferKeyGenClazz(recordsKeyFields, partitionFields) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 335fe68bd2099..54bed6de7b3b8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -58,7 +58,6 @@ object HoodieWriterUtils { hoodieConfig.setDefaultValue(TABLE_TYPE) hoodieConfig.setDefaultValue(PRECOMBINE_FIELD) hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME) - hoodieConfig.setDefaultValue(RECORDKEY_FIELD) hoodieConfig.setDefaultValue(KEYGENERATOR_CLASS_NAME) hoodieConfig.setDefaultValue(ENABLE) hoodieConfig.setDefaultValue(COMMIT_METADATA_KEYPREFIX) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index c31cd3b20565e..71a8c2298bcc2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -294,7 +294,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) } else { - val primaryKeys = table.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName).getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.defaultValue.get) + val primaryKeys = table.properties(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName) val partitions = table.partitionColumnNames.mkString(",") extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index 732367cf5a5e5..7efb60ae0b6a5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -46,7 +46,6 @@ object HoodieOptionConfig { .withSqlKey("primaryKey") .withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD.key) .withTableConfigKey(HoodieTableConfig.RECORDKEY_FIELDS.key) - .defaultValue(DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue()) .build() val SQL_KEY_TABLE_TYPE: HoodieSQLOption[String] = buildConf() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala index 9920aa80baf09..302ddf38682ce 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test class TestDataSourceOptions { @Test def inferDataSourceOptions(): Unit = { val inputOptions1 = Map( + RECORDKEY_FIELD.key -> "uuid", TABLE_NAME.key -> "hudi_table", PARTITIONPATH_FIELD.key -> "year,month" ) @@ -38,6 +39,7 @@ class TestDataSourceOptions { modifiedOptions1(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key)) val inputOptions2 = Map( + RECORDKEY_FIELD.key -> "uuid", TABLE_NAME.key -> "hudi_table", PARTITIONPATH_FIELD.key -> "year", HIVE_STYLE_PARTITIONING.key -> "true" diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 4e4fe43ff94b3..84ce11468942f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -261,14 +261,22 @@ class TestHoodieSparkSqlWriter { @Test def testThrowExceptionAlreadyExistsWithAppendSaveMode(): Unit = { //create a new table - val fooTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4") + val fooTableModifier = Map( + "path" -> tempBasePath, + HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, + "hoodie.datasource.write.recordkey.field" -> "uuid", + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4") val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, dataFrame) //on same path try append with different("hoodie_bar_tbl") table name which should throw an exception - val barTableModifier = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl", - "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4") + val barTableModifier = Map( + "path" -> tempBasePath, + HoodieWriteConfig.TBL_NAME.key -> "hoodie_bar_tbl", + "hoodie.datasource.write.recordkey.field" -> "uuid", + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableModifier, dataFrame2)) assert(tableAlreadyExistException.getMessage.contains("Config conflict")) @@ -864,7 +872,7 @@ class TestHoodieSparkSqlWriter { | dt string | ) using hudi | partitioned by (dt) - | options ( + | tblproperties ( | primaryKey = 'id' | ) | location '$tablePath1' diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucket.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucket.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala index 57ebd038f2ae1..187de2d8e0671 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucket.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceWithBucketIndex.scala @@ -17,27 +17,25 @@ package org.apache.hudi.functional -import scala.collection.JavaConversions._ - import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.config.{HoodieIndexConfig, HoodieLayoutConfig, HoodieWriteConfig} -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.{HoodieIndexConfig, HoodieLayoutConfig, HoodieWriteConfig} import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner +import org.apache.hudi.table.storage.HoodieStorageLayout import org.apache.hudi.testutils.HoodieClientTestBase - +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} -import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner -import org.apache.hudi.table.storage.HoodieStorageLayout +import scala.collection.JavaConversions._ /** * */ -class TestDataSourceForBucketIndex extends HoodieClientTestBase { +class TestMORDataSourceWithBucketIndex extends HoodieClientTestBase { var spark: SparkSession = null val commonOpts = Map( From d8d2f37881c49affe448401ed39020f1d0f22d37 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 4 Nov 2022 14:32:59 +0530 Subject: [PATCH 2/3] Take sql primaryKey from catalog if available --- .../sql/catalyst/catalog/HoodieCatalogTable.scala | 13 +++++++------ .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 71a8c2298bcc2..7bd6dd2244f4f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -21,6 +21,7 @@ import org.apache.hudi.DataSourceWriteOptions.OPERATION import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.{StringUtils, ValidationUtils} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory @@ -277,16 +278,16 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table)) } - if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) { - extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = - originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) + if (originTableConfig.contains(URL_ENCODE_PARTITIONING.key)) { + extraConfig(URL_ENCODE_PARTITIONING.key) = + originTableConfig(URL_ENCODE_PARTITIONING.key) } else { - extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = + extraConfig(URL_ENCODE_PARTITIONING.key) = String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table)) } } else { extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true" - extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue() + extraConfig(URL_ENCODE_PARTITIONING.key) = URL_ENCODE_PARTITIONING.defaultValue() } if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { @@ -294,7 +295,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) } else { - val primaryKeys = table.properties(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName) + val primaryKeys = table.properties.getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName, table.storage.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)).toString val partitions = table.partitionColumnNames.mkString(",") extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 84ce11468942f..34961829db505 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -872,7 +872,7 @@ class TestHoodieSparkSqlWriter { | dt string | ) using hudi | partitioned by (dt) - | tblproperties ( + | options ( | primaryKey = 'id' | ) | location '$tablePath1' From 11ed2543d0fc2b79b3cf25a195910442148c65bb Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 4 Nov 2022 15:59:35 +0530 Subject: [PATCH 3/3] Fix bucket index test --- .../apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java index 575fdfecc4aff..12a5cc6a1a2e8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java @@ -78,13 +78,13 @@ public void tearDown() throws Exception { public void testBucketIndexValidityCheck() { Properties props = new Properties(); props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "_row_key"); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); assertThrows(HoodieIndexException.class, () -> { HoodieIndexConfig.newBuilder().fromProperties(props) .withIndexType(HoodieIndex.IndexType.BUCKET) .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE) .withBucketNum("8").build(); }); - props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid"); props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "uuid"); HoodieIndexConfig.newBuilder().fromProperties(props) .withIndexType(HoodieIndex.IndexType.BUCKET)