diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java index 6f15dea978f02..dcf56f32bcc02 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.schema.SchemaProvider; public class InputBatch { @@ -48,6 +49,9 @@ public String getCheckpointForNextBatch() { } public SchemaProvider getSchemaProvider() { + if (schemaProvider == null) { + throw new HoodieException("Please provide a valid schema provider class!"); + } return schemaProvider; } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 0092de0ead6f5..5cdb532dca8fe 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,12 +47,14 @@ import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.exception.DatasetNotFoundException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.DistributedTestDataSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; @@ -169,7 +172,12 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, Stri } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, - String propsFilename, boolean enableHiveSync) { + String propsFilename, boolean enableHiveSync) { + return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, + String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; @@ -181,12 +189,14 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, Stri cfg.sourceOrderingField = "timestamp"; cfg.propsFilePath = dfsBasePath + "/" + propsFilename; cfg.sourceLimit = 1000; - cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + if (useSchemaProviderClass) { + cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName(); + } return cfg; } static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op, - boolean addReadLatestOnMissingCkpt) { + boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips_copy"; @@ -196,6 +206,9 @@ static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, S cfg.sourceOrderingField = "timestamp"; cfg.propsFilePath = dfsBasePath + "/test-downstream-source.properties"; cfg.sourceLimit = 1000; + if (null != schemaProviderClassName) { + cfg.schemaProviderClassName = schemaProviderClassName; + } List cfgs = new ArrayList<>(); cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt); cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath); @@ -412,7 +425,8 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t // Now incrementally pull from the above hudi table and ingest to second table HoodieDeltaStreamer.Config downstreamCfg = - TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT, true); + TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT, + true, null); new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, hiveServer.getHiveConf()).sync(); TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); @@ -428,7 +442,10 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1); // with no change in upstream table, no change in downstream too when pulled. - new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); + HoodieDeltaStreamer.Config downstreamCfg1 = + TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, + Operation.BULK_INSERT, true, DummySchemaProvider.class.getName()); + new HoodieDeltaStreamer(downstreamCfg1, jsc).sync(); TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertDistanceCountWithExactValue(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); @@ -447,7 +464,8 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t // Incrementally pull changes in upstream hudi table and apply to downstream table downstreamCfg = - TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT, false); + TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT, + false, null); downstreamCfg.sourceLimit = 2000; new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); TestHelpers.assertRecordCount(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext); @@ -467,6 +485,21 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t hiveClient.getLastCommitTimeSynced().get()); } + @Test + public void testNullSchemaProvider() throws Exception { + String dataSetBasePath = dfsBasePath + "/test_dataset"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT, + SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true, + false); + try { + new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); + fail("Should error out when schema provider is not provided"); + } catch (HoodieException e) { + log.error("Expected error during reading data from source ", e); + assertTrue(e.getMessage().contains("Please provide a valid schema provider class!")); + } + } + @Test public void testFilterDupes() throws Exception { String datasetBasePath = dfsBasePath + "/test_dupes_dataset"; @@ -577,4 +610,16 @@ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset