Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import java.util.Objects;
import java.util.Set;

import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP;
import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.TARGET_SCHEMA_REGISTRY_URL_PROP;

/**
* Wrapper over HoodieDeltaStreamer.java class.
* Helps with ingesting incremental data into hoodie datasets for multiple tables.
Expand Down Expand Up @@ -152,19 +155,38 @@ private List<String> getTablesToBeIngested(TypedProperties properties) {

private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, TypedProperties typedProperties) {
if (Objects.equals(cfg.schemaProviderClassName, SchemaRegistryProvider.class.getName())) {
populateSourceRegistryProp(typedProperties);
populateTargetRegistryProp(typedProperties);
}
}

private void populateTargetRegistryProp(TypedProperties typedProperties) {
String schemaRegistryTargetUrl = typedProperties.getString(TARGET_SCHEMA_REGISTRY_URL_PROP, null);
if (StringUtils.isNullOrEmpty(schemaRegistryTargetUrl)) {
String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
String sourceSchemaRegistrySuffix;
String targetSchemaRegistrySuffix;
if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX);
targetSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX);
} else {
targetSchemaRegistrySuffix = schemaRegistrySuffix;
}
typedProperties.setProperty(TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix);
}
}

private void populateSourceRegistryProp(TypedProperties typedProperties) {
String schemaRegistrySourceUrl = typedProperties.getString(SRC_SCHEMA_REGISTRY_URL_PROP, null);
if (StringUtils.isNullOrEmpty(schemaRegistrySourceUrl)) {
String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
String sourceSchemaRegistrySuffix;
if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX);
} else {
sourceSchemaRegistrySuffix = schemaRegistrySuffix;
}
typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix);
typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix);
typedProperties.setProperty(SRC_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix);
}
}

Expand Down Expand Up @@ -397,8 +419,6 @@ public void sync() {

public static class Constants {
public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class SchemaRegistryProvider extends SchemaProvider {
public static class Config {

public static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
private static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
public static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
"hoodie.deltastreamer.schemaprovider.registry.targetUrl";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
Expand All @@ -49,12 +50,13 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa

static class TestHelpers {

static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync) {
return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset");
static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync,
Class<?> clazz) {
return getConfig(fileName, configFolder, sourceClassName, enableHiveSync, enableMetaSync, true, "multi_table_dataset", clazz);
}

static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String configFolder, String sourceClassName, boolean enableHiveSync, boolean enableMetaSync,
boolean setSchemaProvider, String basePathPrefix) {
boolean setSchemaProvider, String basePathPrefix, Class<?> clazz) {
HoodieMultiTableDeltaStreamer.Config config = new HoodieMultiTableDeltaStreamer.Config();
config.configFolder = configFolder;
config.targetTableName = "dummy_table";
Expand All @@ -64,7 +66,7 @@ static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String co
config.sourceClassName = sourceClassName;
config.sourceOrderingField = "timestamp";
if (setSchemaProvider) {
config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
config.schemaProviderClassName = clazz != null ? clazz.getName() : FilebasedSchemaProvider.class.getName();
}
config.enableHiveSync = enableHiveSync;
config.enableMetaSync = enableMetaSync;
Expand All @@ -74,7 +76,7 @@ static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, String co

@Test
public void testInvalidHiveSyncProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
Exception e = assertThrows(HoodieException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when hive sync table not provided with enableHiveSync flag");
Expand All @@ -84,7 +86,7 @@ public void testInvalidHiveSyncProps() throws IOException {

@Test
public void testInvalidPropsFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid props file is provided");
Expand All @@ -94,7 +96,7 @@ public void testInvalidPropsFilePath() throws IOException {

@Test
public void testInvalidTableConfigFilePath() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
Exception e = assertThrows(IllegalArgumentException.class, () -> {
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Should fail when invalid table config props file path is provided");
Expand All @@ -104,7 +106,7 @@ public void testInvalidTableConfigFilePath() throws IOException {

@Test
public void testCustomConfigProps() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false, SchemaRegistryProvider.class);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
TableExecutionContext executionContext = streamer.getTableExecutionContexts().get(1);
assertEquals(2, streamer.getTableExecutionContexts().size());
Expand All @@ -114,13 +116,16 @@ public void testCustomConfigProps() throws IOException {
assertEquals("_row_key", executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key()));
assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()));
assertEquals("uber_hive_dummy_table", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP));
assertEquals("http://localhost:8081/subjects/random-value/versions/latest", executionContext.getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
assertEquals("http://localhost:8081/subjects/topic2-value/versions/latest",
streamer.getTableExecutionContexts().get(0).getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
}

@Test
@Disabled
public void testInvalidIngestionProps() {
Exception e = assertThrows(Exception.class, () -> {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), true, true, null);
new HoodieMultiTableDeltaStreamer(cfg, jsc);
}, "Creation of execution object should fail without kafka topic");
log.debug("Creation of execution object failed with error: " + e.getMessage(), e);
Expand All @@ -139,7 +144,7 @@ public void testMultiTableExecutionWithKafkaSource() throws IOException {
testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));

HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", JsonKafkaSource.class.getName(), false, false, null);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts();
TypedProperties properties = executionContexts.get(1).getProperties();
Expand Down Expand Up @@ -189,7 +194,7 @@ public void testMultiTableExecutionWithParquetSource() throws IOException {
String parquetPropsFile = populateCommonPropsAndWriteToFile();

HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", ParquetDFSSource.class.getName(), false, false,
false, "multi_table_parquet");
false, "multi_table_parquet", null);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);

List<TableExecutionContext> executionContexts = streamer.getTableExecutionContexts();
Expand Down Expand Up @@ -219,7 +224,7 @@ public void testMultiTableExecutionWithParquetSource() throws IOException {

@Test
public void testTableLevelProperties() throws IOException {
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false);
HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", TestDataSource.class.getName(), false, false, null);
HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> tableExecutionContexts = streamer.getTableExecutionContexts();
tableExecutionContexts.forEach(tableExecutionContext -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ hoodie.deltastreamer.source.kafka.topic=topic2
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator
hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/
hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ hoodie.deltastreamer.source.kafka.topic=topic1
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
hoodie.datasource.hive_sync.database=uber_hive_db
hoodie.datasource.hive_sync.table=uber_hive_dummy_table
hoodie.datasource.hive_sync.table=uber_hive_dummy_table
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest
hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/random-value/versions/latest