Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;

public class InputBatch<T> {
Expand Down Expand Up @@ -48,6 +49,9 @@ public String getCheckpointForNextBatch() {
}

public SchemaProvider getSchemaProvider() {
if (schemaProvider == null) {
throw new HoodieException("Please provide a valid schema provider class!");
}
Comment on lines +52 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change affects the TestHoodieDeltaStreamer#testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline, and the UT should be changed as well. You could click the details below to get travis detail message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leesf Done with the changes, please review.

return schemaProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -46,12 +47,14 @@
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.exception.DatasetNotFoundException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
Expand Down Expand Up @@ -169,7 +172,12 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, Stri
}

static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
String propsFilename, boolean enableHiveSync) {
String propsFilename, boolean enableHiveSync) {
return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true);
}

static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
Expand All @@ -181,12 +189,14 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, Stri
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = 1000;
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
if (useSchemaProviderClass) {
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
}
return cfg;
}

static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op,
boolean addReadLatestOnMissingCkpt) {
boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips_copy";
Expand All @@ -196,6 +206,9 @@ static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, S
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/test-downstream-source.properties";
cfg.sourceLimit = 1000;
if (null != schemaProviderClassName) {
cfg.schemaProviderClassName = schemaProviderClassName;
}
List<String> cfgs = new ArrayList<>();
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" + addReadLatestOnMissingCkpt);
cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
Expand Down Expand Up @@ -412,7 +425,8 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t

// Now incrementally pull from the above hudi table and ingest to second table
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT, true);
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT,
true, null);
new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
Expand All @@ -428,7 +442,10 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);

// with no change in upstream table, no change in downstream too when pulled.
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
HoodieDeltaStreamer.Config downstreamCfg1 =
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath,
Operation.BULK_INSERT, true, DummySchemaProvider.class.getName());
new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
Expand All @@ -447,7 +464,8 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t

// Incrementally pull changes in upstream hudi table and apply to downstream table
downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT, false);
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT,
false, null);
downstreamCfg.sourceLimit = 2000;
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
TestHelpers.assertRecordCount(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
Expand All @@ -467,6 +485,21 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t
hiveClient.getLastCommitTimeSynced().get());
}

@Test
public void testNullSchemaProvider() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
false);
try {
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
fail("Should error out when schema provider is not provided");
} catch (HoodieException e) {
log.error("Expected error during reading data from source ", e);
assertTrue(e.getMessage().contains("Please provide a valid schema provider class!"));
}
}

@Test
public void testFilterDupes() throws Exception {
String datasetBasePath = dfsBasePath + "/test_dupes_dataset";
Expand Down Expand Up @@ -577,4 +610,16 @@ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Ro
return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
}
}

public static class DummySchemaProvider extends SchemaProvider {

public DummySchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}

@Override
public Schema getSourceSchema() {
return Schema.create(Schema.Type.NULL);
}
}
}