Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ private void populateTableExecutionContextList(TypedProperties properties, Strin
Helpers.deepCopyConfigs(config, cfg);
String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
if (cfg.enableHiveSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
throw new HoodieException("Hive sync table field not provided!");
if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(), ""))) {
throw new HoodieException("Meta sync table field not provided!");
}
populateSchemaProviderProps(cfg, tableProperties);
executionContext = new TableExecutionContext();
Expand Down Expand Up @@ -180,6 +180,7 @@ static String getTableWithDatabase(TableExecutionContext context) {

static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tableConfig) {
tableConfig.enableHiveSync = globalConfig.enableHiveSync;
tableConfig.enableMetaSync = globalConfig.enableMetaSync;
tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
tableConfig.sourceClassName = globalConfig.sourceClassName;
Expand Down Expand Up @@ -207,6 +208,11 @@ static void deepCopyConfigs(Config globalConfig, HoodieDeltaStreamer.Config tabl

public static void main(String[] args) throws IOException {
final Config config = new Config();

if (config.enableHiveSync) {
logger.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing");
}

JCommander cmd = new JCommander(config, null, args);
if (config.help || args.length == 0) {
cmd.usage();
Expand Down Expand Up @@ -292,6 +298,9 @@ public static class Config implements Serializable {
@Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
public Boolean enableHiveSync = false;

@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
public Boolean enableMetaSync = false;

@Parameter(names = {"--max-pending-compactions"},
description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unless"
+ "outstanding compactions is less than this number")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamerBa

static class TestHelpers {

static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync) {
return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, true, "multi_table_dataset");
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync) {
return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset");
}

static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync,
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync,
boolean setSchemaProvider, String basePathPrefix) {
HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
config.configFolder = configFolder;
Expand All @@ -67,13 +67,14 @@ static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String co
config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
config.enableHiveSync = enableHiveSync;
config.enableMetaSync = enableMetaSync;
return config;
}
}

@Test
public void testInvalidHiveSyncProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
Exception e = assertThrows(HoodieException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when hive sync table not provided with enableHiveSync flag");
Expand All @@ -83,7 +84,7 @@ public void testInvalidHiveSyncProps() throws IOException {

@Test
public void testInvalidPropsFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid props file is provided");
Expand All @@ -93,7 +94,7 @@ public void testInvalidPropsFilePath() throws IOException {

@Test
public void testInvalidTableConfigFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid table config props file path is provided");
Expand All @@ -103,7 +104,7 @@ public void testInvalidTableConfigFilePath() throws IOException {

@Test
public void testCustomConfigProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
assertEquals(2, streamer.getTableExecutionContexts().size());
Expand All @@ -119,7 +120,7 @@ public void testCustomConfigProps() throws IOException {
@Disabled
public void testInvalidIngestionProps() {
Exception e = assertThrows(Exception.class, () -> {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Creation of execution object should fail without kafka topic");
log.debug("Creation of execution object failed with error: " + e.getMessage(), e);
Expand All @@ -138,7 +139,7 @@ public void testMultiTableExecutionWithKafkaSource() throws IOException {
testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));

HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts();
TypedProperties properties = executionContexts.get(1).getProperties();
Expand Down Expand Up @@ -187,7 +188,7 @@ public void testMultiTableExecutionWithParquetSource() throws IOException {
// add only common props. later we can add per table props
String parquetPropsFile = populateCommonPropsAndWriteToFile();

HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false,
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, false,
false, "multi_table_parquet");
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);

Expand Down Expand Up @@ -218,7 +219,7 @@ public void testMultiTableExecutionWithParquetSource() throws IOException {

@Test
public void testTableLevelProperties() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> tableExecutionContexts = streamer.getTableExecutionContexts();
tableExecutionContexts.forEach(tableExecutionContext -> {
Expand Down