diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 867aa05b301f8..74cb3e31df079 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.HoodieWriterUtils; import org.apache.hudi.async.AsyncClusteringService; import org.apache.hudi.async.AsyncCompactService; import org.apache.hudi.async.HoodieAsyncService; @@ -76,6 +77,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -651,6 +653,9 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config + cfg.baseFileFormat); cfg.baseFileFormat = baseFileFormat; this.cfg.baseFileFormat = baseFileFormat; + Map propsToValidate = new HashMap<>(); + properties.get().forEach((k,v) -> propsToValidate.put(k.toString(),v.toString())); + HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig()); } else { tableType = HoodieTableType.valueOf(cfg.tableType); if (cfg.baseFileFormat == null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 035ad9b1297a8..f05a36745b8a7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -612,25 +612,16 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception { // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); - new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1); + syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); // No new data => no commits. cfg.sourceLimit = 0; - new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1); + syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); // upsert() #1 cfg.sourceLimit = 2000; cfg.operation = WriteOperationType.UPSERT; - new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext); - TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); + syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2); List counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); @@ -664,6 +655,43 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception { assertTrue(fieldNames.containsAll(expectedFieldNames)); } + @Test + public void testModifiedTableConfigs() throws Exception { + String tableBasePath = dfsBasePath + "/test_table_modified_configs"; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); + syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); + + // No new data => no commits. + cfg.sourceLimit = 0; + syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1); + + // add disallowed config update to recordkey field. An exception should be thrown + cfg.sourceLimit = 2000; + cfg.operation = WriteOperationType.UPSERT; + cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval"); + assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1)); + List counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + assertEquals(1000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); + + + //perform the upsert and now with the original config, the commit should go through + HoodieDeltaStreamer.Config newCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); + newCfg.sourceLimit = 2000; + newCfg.operation = WriteOperationType.UPSERT; + syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2); + List counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext); + assertEquals(1950, counts2.stream().mapToLong(entry -> entry.getLong(1)).sum()); + } + + private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception { + new HoodieDeltaStreamer(cfg, jsc).sync(); + TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext); + TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext); + TestHelpers.assertCommitMetadata(metadata, tableBasePath, dfs, totalCommits); + } + @ParameterizedTest @MethodSource("schemaEvolArgs") public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception { @@ -1419,7 +1447,7 @@ public void testNullSchemaProvider() throws Exception { @Test public void testPayloadClassUpdate() throws Exception { - String dataSetBasePath = dfsBasePath + "/test_dataset_mor"; + String dataSetBasePath = dfsBasePath + "/test_dataset_mor_payload_class_update"; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, true, false, null, "MERGE_ON_READ"); @@ -1592,6 +1620,8 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans populateCommonProps(parquetProps, dfsBasePath); } + parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); + parquetProps.setProperty("include", "base.properties"); parquetProps.setProperty("hoodie.embed.timeline.server", "false"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); @@ -2142,7 +2172,7 @@ public void testDeletePartitions() throws Exception { // No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION. TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); } - + void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception { // Initial insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);