diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index a7bf353536bc8..7e49d9b88f69f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -128,8 +128,8 @@ private void populateTableExecutionContextList(TypedProperties properties, Strin Helpers.deepCopyConfigs(config, cfg); String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, ""); cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath; - if (cfg.enableHiveSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) { - throw new HoodieException("Hive sync table field not provided!"); + if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) { + throw new HoodieException("Meta sync table field not provided!"); } populateSchemaProviderProps(cfg, tableProperties); executionContext = new TableExecutionContext(); @@ -180,6 +180,7 @@ static String getTableWithDatabase(TableExecutionContext context) { static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) { tableConfig.enableHiveSync = globalConfig.enableHiveSync; + tableConfig.enableMetaSync = globalConfig.enableMetaSync; tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName; tableConfig.sourceOrderingField = globalConfig.sourceOrderingField; tableConfig.sourceClassName = globalConfig.sourceClassName; @@ -207,6 +208,11 @@ static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tabl public static void main(String[] args) throws IOException { final Config config = new Config(); + + if (config.enableHiveSync) { + logger.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing"); + } + JCommander cmd = new JCommander(config, null, args); if (config.help || args.length == 0) { cmd.usage(); @@ -292,6 +298,9 @@ public static class Config implements Serializable { @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") public Boolean enableHiveSync = false; + @Parameter(names = {"--enable-sync"}, description = "Enable syncing meta") + public Boolean enableMetaSync = false; + @Parameter(names = {"--max-pending-compactions"}, description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless" + "outstanding compactions is less than this number") diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index 3f2e86e2438b1..7162fb7689f03 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -49,11 +49,11 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa static class TestHelpers { - static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync) { - return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, true, "multi_table_dataset"); + static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync) { + return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset"); } - static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, + static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync, boolean setSchemaProvider, String basePathPrefix) { HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config(); config.configFolder = configFolder; @@ -67,13 +67,14 @@ static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String co config.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); } config.enableHiveSync = enableHiveSync; + config.enableMetaSync = enableMetaSync; return config; } } @Test public void testInvalidHiveSyncProps() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); Exception e = assertThrows(HoodieException.class, () -> { new HoodieMultiTableDeltaStreamer(cfg, jsc); }, "Should fail when hive sync table not provided with enableHiveSync flag"); @@ -83,7 +84,7 @@ public void testInvalidHiveSyncProps() throws IOException { @Test public void testInvalidPropsFilePath() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); Exception e = assertThrows(IllegalArgumentException.class, () -> { new HoodieMultiTableDeltaStreamer(cfg, jsc); }, "Should fail when invalid props file is provided"); @@ -93,7 +94,7 @@ public void testInvalidPropsFilePath() throws IOException { @Test public void testInvalidTableConfigFilePath() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); Exception e = assertThrows(IllegalArgumentException.class, () -> { new HoodieMultiTableDeltaStreamer(cfg, jsc); }, "Should fail when invalid table config props file path is provided"); @@ -103,7 +104,7 @@ public void testInvalidTableConfigFilePath() throws IOException { @Test public void testCustomConfigProps() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1); assertEquals(2, streamer.getTableExecutionContexts().size()); @@ -119,7 +120,7 @@ public void testCustomConfigProps() throws IOException { @Disabled public void testInvalidIngestionProps() { Exception e = assertThrows(Exception.class, () -> { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true); new HoodieMultiTableDeltaStreamer(cfg, jsc); }, "Creation of execution object should fail without kafka topic"); log.debug("Creation of execution object failed with error: " + e.getMessage(), e); @@ -138,7 +139,7 @@ public void testMultiTableExecutionWithKafkaSource() throws IOException { testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); List executionContexts = streamer.getTableExecutionContexts(); TypedProperties properties = executionContexts.get(1).getProperties(); @@ -187,7 +188,7 @@ public void testMultiTableExecutionWithParquetSource() throws IOException { // add only common props. later we can add per table props String parquetPropsFile = populateCommonPropsAndWriteToFile(); - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, false, false, "multi_table_parquet"); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); @@ -218,7 +219,7 @@ public void testMultiTableExecutionWithParquetSource() throws IOException { @Test public void testTableLevelProperties() throws IOException { - HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false); + HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); List tableExecutionContexts = streamer.getTableExecutionContexts(); tableExecutionContexts.forEach(tableExecutionContext -> {