Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ object AvroConversionUtils {

def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema.toString, structName, recordNamespace)
createRdd(df, avroSchema, structName, recordNamespace)
}

def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: String, recordNamespace: String)
def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
: RDD[GenericRecord] = {
val dataType = df.schema
// Use the Avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val avroSchemaAsJsonString = avroSchema.toString
val encoder = RowEncoder.apply(dataType).resolveAndBind()
df.queryExecution.toRdd.map(encoder.fromRow)
.mapPartitions { records =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,19 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource
Option<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() != null) {
// If the target schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema
avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd(
t, this.schemaProvider.getTargetSchema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
} else {
avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd(
t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
}

// Use Transformed Row's schema if not overridden
// Use Transformed Row's schema if not overridden. If target schema is not specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
Expand Down Expand Up @@ -64,7 +65,17 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
case ROW: {
InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
rdd -> (AvroConversionUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()))
rdd -> (
(r.getSchemaProvider() instanceof FilebasedSchemaProvider)
// If the source schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema
? AvroConversionUtils.createRdd(
rdd, r.getSchemaProvider().getSourceSchema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
: AvroConversionUtils.createRdd(
rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.utilities;

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -44,6 +45,7 @@
import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
Expand Down Expand Up @@ -96,8 +98,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final Random RANDOM = new Random();
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
private static final int PARQUET_NUM_RECORDS = 5;
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);

private static int parquetTestNum = 1;

@BeforeClass
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
Expand Down Expand Up @@ -146,6 +153,8 @@ public static void initClass() throws Exception {
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);

prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
}

@AfterClass
Expand Down Expand Up @@ -186,17 +195,24 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, Stri
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName, String tableType) {
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync,
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType);
}

static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName,
String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
cfg.sourceClassName = TestDataSource.class.getName();
cfg.sourceClassName = sourceClassName;
cfg.transformerClassName = transformerClassName;
cfg.operation = op;
cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
cfg.sourceLimit = 1000;
cfg.sourceLimit = sourceLimit;
if (updatePayloadClass) {
cfg.payloadClassName = payloadClassName;
}
Expand Down Expand Up @@ -620,6 +636,62 @@ public void testDistributedTestDataSource() {
Assert.assertEquals(1000, c);
}

private static void prepareParquetDFSFiles(int numRecords) throws IOException {
String path = PARQUET_SOURCE_ROOT + "/1.parquet";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInserts("000", numRecords), dataGenerator), new Path(path));
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
if (hasTransformer) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the key to this property right? Isn't ".....target.schema.file" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've found and fixed that in #1165.

}
}
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);

UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
}

private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception {
prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
String tableBasePath = dfsBasePath + "/test_parquet_table" + parquetTestNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(),
transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
useSchemaProvider, 100000, false, null, null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
parquetTestNum++;
}

@Test
public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
testParquetDFSSource(false, null);
}

@Test
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName());
}

@Test
public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
testParquetDFSSource(true, null);
}

@Test
public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
}

/**
* UDF to calculate Haversine distance.
*/
Expand Down