Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,30 @@ public class HoodieTestDataGenerator {
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static final int DEFAULT_PARTITION_DEPTH = 3;
public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},";
public static final String TRIP_SCHEMA_SUFFIX = "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
public static final String FARE_NESTED_SCHEMA = "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},";
public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"},"
+ "{\"name\": \"currency\", \"type\": \"string\"},";

public static final String TRIP_EXAMPLE_SCHEMA =
TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
public static final String TRIP_FLATTENED_SCHEMA =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason for introducing this flattening?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for CSV format, the nested schema is not well supported. So to test CSV source, we need to generate the test CSV data with a flattened schema.

TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;

public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
+ "struct<amount:double,currency:string>,boolean";

public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);

private static final Random RAND = new Random(46474747);

Expand Down Expand Up @@ -115,10 +125,33 @@ public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths
}

/**
* Generates a new avro record of the above schema format, retaining the key if optionally provided.
* Generates a new avro record of the above nested schema format,
* retaining the key if optionally provided.
*
* @param key Hoodie key.
* @param commitTime Commit time to use.
* @return Raw paylaod of a test record.
* @throws IOException
*/
public static TestRawTripPayload generateRandomValue(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0);
return generateRandomValue(key, commitTime, false);
}

/**
* Generates a new avro record with the specified schema (nested or flattened),
* retaining the key if optionally provided.
*
* @param key Hoodie key.
* @param commitTime Commit time to use.
* @param isFlattened whether the schema of the record should be flattened.
* @return Raw paylaod of a test record.
* @throws IOException
*/
public static TestRawTripPayload generateRandomValue(
HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
GenericRecord rec = generateGenericRecord(
key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
false, isFlattened);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}

Expand All @@ -127,7 +160,7 @@ public static TestRawTripPayload generateRandomValue(HoodieKey key, String commi
*/
public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
true);
true, false);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}

Expand All @@ -141,12 +174,13 @@ public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String commit

public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) {
return generateGenericRecord(rowKey, riderName, driverName, timestamp, false);
return generateGenericRecord(rowKey, riderName, driverName, timestamp, false, false);
}

public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp, boolean isDeleteRecord) {
GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
double timestamp, boolean isDeleteRecord,
boolean isFlattened) {
GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
rec.put("rider", riderName);
Expand All @@ -156,10 +190,15 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());

GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
fareRecord.put("amount", RAND.nextDouble() * 100);
fareRecord.put("currency", "USD");
rec.put("fare", fareRecord);
if (isFlattened) {
rec.put("fare", RAND.nextDouble() * 100);
rec.put("currency", "USD");
} else {
GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
fareRecord.put("amount", RAND.nextDouble() * 100);
fareRecord.put("currency", "USD");
rec.put("fare", fareRecord);
}

if (isDeleteRecord) {
rec.put("_hoodie_is_deleted", true);
Expand Down Expand Up @@ -230,16 +269,31 @@ public static void createSavepointFile(String basePath, String commitTime, Confi
}

/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
* Generates new inserts with nested schema, uniformly across the partition paths above.
* It also updates the list of existing keys.
*/
public List<HoodieRecord> generateInserts(String commitTime, Integer n) {
return generateInsertsStream(commitTime, n).collect(Collectors.toList());
return generateInserts(commitTime, n, false);
}

/**
* Generates new inserts, uniformly across the partition paths above.
* It also updates the list of existing keys.
*
* @param commitTime Commit time to use.
* @param n Number of records.
* @param isFlattened whether the schema of the generated record is flattened
* @return List of {@link HoodieRecord}s
*/
public List<HoodieRecord> generateInserts(String commitTime, Integer n, boolean isFlattened) {
return generateInsertsStream(commitTime, n, isFlattened).collect(Collectors.toList());
}

/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n) {
public Stream<HoodieRecord> generateInsertsStream(
String commitTime, Integer n, boolean isFlattened) {
int currSize = getNumExistingKeys();

return IntStream.range(0, n).boxed().map(i -> {
Expand All @@ -251,7 +305,7 @@ public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n)
existingKeys.put(currSize + i, kp);
numExistingKeys++;
try {
return new HoodieRecord(key, generateRandomValue(key, commitTime));
return new HoodieRecord(key, generateRandomValue(key, commitTime, isFlattened));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
Expand Down
5 changes: 5 additions & 0 deletions hudi-utilities/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>${fasterxml.version}</version>
</dependency>

<!-- Parquet -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

/**
* Reads data from CSV files on DFS as the data source.
*
* Internally, we use Spark to read CSV files thus any limitation of Spark CSV also applies here
* (e.g., limited support for nested schema).
*
* You can set the CSV-specific configs in the format of hoodie.deltastreamer.csv.*
* that are Spark compatible to deal with CSV files in Hudi. The supported options are:
*
* "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment",
* "header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace",
* "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf",
* "negativeInf", "dateFormat", "timestampFormat", "maxColumns", "maxCharsPerColumn",
* "mode", "columnNameOfCorruptRecord", "multiLine"
*
* Detailed information of these CSV options can be found at:
* https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html#csv-scala.collection.Seq-
*
* If the source Avro schema is provided through the {@link org.apache.hudi.utilities.schema.FilebasedSchemaProvider}
* using "hoodie.deltastreamer.schemaprovider.source.schema.file" config, the schema is
* passed to the CSV reader without inferring the schema from the CSV file.
*/
public class CsvDFSSource extends RowSource {
// CsvSource config prefix
public static final String CSV_SRC_CONFIG_PREFIX = "hoodie.deltastreamer.csv.";
// CSV-specific configurations to pass in from Hudi to Spark
public static final List<String> CSV_CONFIG_KEYS = Arrays.asList(
"sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment",
"header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace",
"ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf",
"negativeInf", "dateFormat", "timestampFormat", "maxColumns", "maxCharsPerColumn",
"mode", "columnNameOfCorruptRecord", "multiLine"
);

private final DFSPathSelector pathSelector;
private final StructType sourceSchema;

public CsvDFSSource(TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration());
if (schemaProvider != null) {
sourceSchema = (StructType) SchemaConverters.toSqlType(schemaProvider.getSourceSchema())
.dataType();
} else {
sourceSchema = null;
}
}

@Override
protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr,
long sourceLimit) {
Pair<Option<String>, String> selPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
return Pair.of(fromFiles(
selPathsWithMaxModificationTime.getLeft()), selPathsWithMaxModificationTime.getRight());
}

/**
* Reads the CSV files and parsed the lines into {@link Dataset} of {@link Row}.
*
* @param pathStr The list of file paths, separated by ','.
* @return {@link Dataset} of {@link Row} containing the records.
*/
private Option<Dataset<Row>> fromFiles(Option<String> pathStr) {
if (pathStr.isPresent()) {
DataFrameReader dataFrameReader = sparkSession.read().format("csv");
CSV_CONFIG_KEYS.forEach(optionKey -> {
String configPropName = CSV_SRC_CONFIG_PREFIX + optionKey;
String value = props.getString(configPropName, null);
// Pass down the Hudi CSV configs to Spark DataFrameReader
if (value != null) {
dataFrameReader.option(optionKey, value);
}
});
if (sourceSchema != null) {
// Source schema is specified, pass it to the reader
dataFrameReader.schema(sourceSchema);
}
dataFrameReader.option("inferSchema", Boolean.toString(sourceSchema == null));

return Option.of(dataFrameReader.load(pathStr.get().split(",")));
} else {
return Option.empty();
}
}
}
Loading