diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 6d86e9350e3ca..330788161afc2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -74,20 +74,30 @@ public class HoodieTestDataGenerator { public static final String[] DEFAULT_PARTITION_PATHS = {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; public static final int DEFAULT_PARTITION_DEPTH = 3; - public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," - + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"}," - + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [" - + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}}," - + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}"; + + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"; + public static final String TRIP_SCHEMA_SUFFIX = "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}"; + public static final String FARE_NESTED_SCHEMA = "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [" + + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"; + public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"}," + + "{\"name\": \"currency\", \"type\": \"string\"},"; + + public static final String TRIP_EXAMPLE_SCHEMA = + TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; + public static final String TRIP_FLATTENED_SCHEMA = + TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX; + public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double," + "struct,boolean"; + public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS = HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA); + public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA); private static final Random RAND = new Random(46474747); @@ -115,10 +125,33 @@ public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths } /** - * Generates a new avro record of the above schema format, retaining the key if optionally provided. + * Generates a new avro record of the above nested schema format, + * retaining the key if optionally provided. + * + * @param key Hoodie key. + * @param commitTime Commit time to use. + * @return Raw paylaod of a test record. + * @throws IOException */ public static TestRawTripPayload generateRandomValue(HoodieKey key, String commitTime) throws IOException { - GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0); + return generateRandomValue(key, commitTime, false); + } + + /** + * Generates a new avro record with the specified schema (nested or flattened), + * retaining the key if optionally provided. + * + * @param key Hoodie key. + * @param commitTime Commit time to use. + * @param isFlattened whether the schema of the record should be flattened. + * @return Raw paylaod of a test record. + * @throws IOException + */ + public static TestRawTripPayload generateRandomValue( + HoodieKey key, String commitTime, boolean isFlattened) throws IOException { + GenericRecord rec = generateGenericRecord( + key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0, + false, isFlattened); return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); } @@ -127,7 +160,7 @@ public static TestRawTripPayload generateRandomValue(HoodieKey key, String commi */ public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime) throws IOException { GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0, - true); + true, false); return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); } @@ -141,12 +174,13 @@ public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String commit public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, double timestamp) { - return generateGenericRecord(rowKey, riderName, driverName, timestamp, false); + return generateGenericRecord(rowKey, riderName, driverName, timestamp, false, false); } public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, - double timestamp, boolean isDeleteRecord) { - GenericRecord rec = new GenericData.Record(AVRO_SCHEMA); + double timestamp, boolean isDeleteRecord, + boolean isFlattened) { + GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA); rec.put("_row_key", rowKey); rec.put("timestamp", timestamp); rec.put("rider", riderName); @@ -156,10 +190,15 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam rec.put("end_lat", RAND.nextDouble()); rec.put("end_lon", RAND.nextDouble()); - GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema()); - fareRecord.put("amount", RAND.nextDouble() * 100); - fareRecord.put("currency", "USD"); - rec.put("fare", fareRecord); + if (isFlattened) { + rec.put("fare", RAND.nextDouble() * 100); + rec.put("currency", "USD"); + } else { + GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema()); + fareRecord.put("amount", RAND.nextDouble() * 100); + fareRecord.put("currency", "USD"); + rec.put("fare", fareRecord); + } if (isDeleteRecord) { rec.put("_hoodie_is_deleted", true); @@ -230,16 +269,31 @@ public static void createSavepointFile(String basePath, String commitTime, Confi } /** - * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + * Generates new inserts with nested schema, uniformly across the partition paths above. + * It also updates the list of existing keys. */ public List generateInserts(String commitTime, Integer n) { - return generateInsertsStream(commitTime, n).collect(Collectors.toList()); + return generateInserts(commitTime, n, false); + } + + /** + * Generates new inserts, uniformly across the partition paths above. + * It also updates the list of existing keys. + * + * @param commitTime Commit time to use. + * @param n Number of records. + * @param isFlattened whether the schema of the generated record is flattened + * @return List of {@link HoodieRecord}s + */ + public List generateInserts(String commitTime, Integer n, boolean isFlattened) { + return generateInsertsStream(commitTime, n, isFlattened).collect(Collectors.toList()); } /** * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. */ - public Stream generateInsertsStream(String commitTime, Integer n) { + public Stream generateInsertsStream( + String commitTime, Integer n, boolean isFlattened) { int currSize = getNumExistingKeys(); return IntStream.range(0, n).boxed().map(i -> { @@ -251,7 +305,7 @@ public Stream generateInsertsStream(String commitTime, Integer n) existingKeys.put(currSize + i, kp); numExistingKeys++; try { - return new HoodieRecord(key, generateRandomValue(key, commitTime)); + return new HoodieRecord(key, generateRandomValue(key, commitTime, isFlattened)); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index f82243b07e35b..2324ae8da78b1 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -137,6 +137,11 @@ com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} + + com.fasterxml.jackson.dataformat + jackson-dataformat-csv + ${fasterxml.version} + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java new file mode 100644 index 0000000000000..b8ccd6e90286f --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.List; + +/** + * Reads data from CSV files on DFS as the data source. + * + * Internally, we use Spark to read CSV files thus any limitation of Spark CSV also applies here + * (e.g., limited support for nested schema). + * + * You can set the CSV-specific configs in the format of hoodie.deltastreamer.csv.* + * that are Spark compatible to deal with CSV files in Hudi. The supported options are: + * + * "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment", + * "header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace", + * "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf", + * "negativeInf", "dateFormat", "timestampFormat", "maxColumns", "maxCharsPerColumn", + * "mode", "columnNameOfCorruptRecord", "multiLine" + * + * Detailed information of these CSV options can be found at: + * https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html#csv-scala.collection.Seq- + * + * If the source Avro schema is provided through the {@link org.apache.hudi.utilities.schema.FilebasedSchemaProvider} + * using "hoodie.deltastreamer.schemaprovider.source.schema.file" config, the schema is + * passed to the CSV reader without inferring the schema from the CSV file. + */ +public class CsvDFSSource extends RowSource { + // CsvSource config prefix + public static final String CSV_SRC_CONFIG_PREFIX = "hoodie.deltastreamer.csv."; + // CSV-specific configurations to pass in from Hudi to Spark + public static final List CSV_CONFIG_KEYS = Arrays.asList( + "sep", "encoding", "quote", "escape", "charToEscapeQuoteEscaping", "comment", + "header", "enforceSchema", "inferSchema", "samplingRatio", "ignoreLeadingWhiteSpace", + "ignoreTrailingWhiteSpace", "nullValue", "emptyValue", "nanValue", "positiveInf", + "negativeInf", "dateFormat", "timestampFormat", "maxColumns", "maxCharsPerColumn", + "mode", "columnNameOfCorruptRecord", "multiLine" + ); + + private final DFSPathSelector pathSelector; + private final StructType sourceSchema; + + public CsvDFSSource(TypedProperties props, + JavaSparkContext sparkContext, + SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + this.pathSelector = new DFSPathSelector(props, sparkContext.hadoopConfiguration()); + if (schemaProvider != null) { + sourceSchema = (StructType) SchemaConverters.toSqlType(schemaProvider.getSourceSchema()) + .dataType(); + } else { + sourceSchema = null; + } + } + + @Override + protected Pair>, String> fetchNextBatch(Option lastCkptStr, + long sourceLimit) { + Pair, String> selPathsWithMaxModificationTime = + pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit); + return Pair.of(fromFiles( + selPathsWithMaxModificationTime.getLeft()), selPathsWithMaxModificationTime.getRight()); + } + + /** + * Reads the CSV files and parsed the lines into {@link Dataset} of {@link Row}. + * + * @param pathStr The list of file paths, separated by ','. + * @return {@link Dataset} of {@link Row} containing the records. + */ + private Option> fromFiles(Option pathStr) { + if (pathStr.isPresent()) { + DataFrameReader dataFrameReader = sparkSession.read().format("csv"); + CSV_CONFIG_KEYS.forEach(optionKey -> { + String configPropName = CSV_SRC_CONFIG_PREFIX + optionKey; + String value = props.getString(configPropName, null); + // Pass down the Hudi CSV configs to Spark DataFrameReader + if (value != null) { + dataFrameReader.option(optionKey, value); + } + }); + if (sourceSchema != null) { + // Source schema is specified, pass it to the reader + dataFrameReader.schema(sourceSchema); + } + dataFrameReader.option("inferSchema", Boolean.toString(sourceSchema == null)); + + return Option.of(dataFrameReader.load(pathStr.get().split(","))); + } else { + return Option.empty(); + } + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 100faa210bbe3..9224be0947870 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -19,8 +19,8 @@ package org.apache.hudi.utilities; import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; @@ -42,6 +42,7 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.DistributedTestDataSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; @@ -60,6 +61,7 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; @@ -98,12 +100,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final Random RANDOM = new Random(); private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; + private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; private static final int PARQUET_NUM_RECORDS = 5; + private static final int CSV_NUM_RECORDS = 3; private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); - private static int parquetTestNum = 1; + private static int testNum = 1; @BeforeClass public static void initClass() throws Exception { @@ -114,7 +118,9 @@ public static void initClass() throws Exception { UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, dfsBasePath + "/sql-transformer.properties"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc"); UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc", dfs, dfsBasePath + "/target-flattened.avsc"); TypedProperties props = new TypedProperties(); props.setProperty("include", "sql-transformer.properties"); @@ -197,12 +203,12 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, Stri String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, String payloadClassName, String tableType) { return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync, - useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType); + useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp"); } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName, String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, - int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType) { + int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; @@ -211,7 +217,7 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, Stri cfg.transformerClassName = transformerClassName; cfg.operation = op; cfg.enableHiveSync = enableHiveSync; - cfg.sourceOrderingField = "timestamp"; + cfg.sourceOrderingField = sourceOrderingField; cfg.propsFilePath = dfsBasePath + "/" + propsFilename; cfg.sourceLimit = sourceLimit; if (updatePayloadClass) { @@ -653,7 +659,7 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans if (useSchemaProvider) { parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); if (hasTransformer) { - parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc"); + parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); } } parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT); @@ -663,14 +669,14 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception { prepareParquetDFSSource(useSchemaProvider, transformerClassName != null); - String tableBasePath = dfsBasePath + "/test_parquet_table" + parquetTestNum; + String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(), transformerClassName, PROPS_FILENAME_TEST_PARQUET, false, - useSchemaProvider, 100000, false, null, null), jsc); + useSchemaProvider, 100000, false, null, null, "timestamp"), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); - parquetTestNum++; + testNum++; } @Test @@ -693,6 +699,146 @@ public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName()); } + private void prepareCsvDFSSource( + boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException { + String sourceRoot = dfsBasePath + "/csvFiles"; + String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0"; + + // Properties used for testing delta-streamer with CSV source + TypedProperties csvProps = new TypedProperties(); + csvProps.setProperty("include", "base.properties"); + csvProps.setProperty("hoodie.datasource.write.recordkey.field", recordKeyField); + csvProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + if (useSchemaProvider) { + csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source-flattened.avsc"); + if (hasTransformer) { + csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target-flattened.avsc"); + } + } + csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot); + + if (sep != ',') { + if (sep == '\t') { + csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t"); + } else { + csvProps.setProperty("hoodie.deltastreamer.csv.sep", Character.toString(sep)); + } + } + if (hasHeader) { + csvProps.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(hasHeader)); + } + + UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_CSV); + + String path = sourceRoot + "/1.csv"; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + UtilitiesTestBase.Helpers.saveCsvToDFS( + hasHeader, sep, + Helpers.jsonifyRecords(dataGenerator.generateInserts("000", CSV_NUM_RECORDS, true)), + dfs, path); + } + + private void testCsvDFSSource( + boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName) throws Exception { + prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassName != null); + String tableBasePath = dfsBasePath + "/test_csv_table" + testNum; + String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0"; + HoodieDeltaStreamer deltaStreamer = + new HoodieDeltaStreamer(TestHelpers.makeConfig( + tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(), + transformerClassName, PROPS_FILENAME_TEST_CSV, false, + useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + testNum++; + } + + @Test + public void testCsvDFSSourceWithHeaderWithoutSchemaProviderAndNoTransformer() throws Exception { + // The CSV files have header, the columns are separated by ',', the default separator + // No schema provider is specified, no transformer is applied + // In this case, the source schema comes from the inferred schema of the CSV files + testCsvDFSSource(true, ',', false, null); + } + + @Test + public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndNoTransformer() throws Exception { + // The CSV files have header, the columns are separated by '\t', + // which is passed in through the Hudi CSV properties + // No schema provider is specified, no transformer is applied + // In this case, the source schema comes from the inferred schema of the CSV files + testCsvDFSSource(true, '\t', false, null); + } + + @Test + public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndNoTransformer() throws Exception { + // The CSV files have header, the columns are separated by '\t' + // File schema provider is used, no transformer is applied + // In this case, the source schema comes from the source Avro schema file + testCsvDFSSource(true, '\t', true, null); + } + + @Test + public void testCsvDFSSourceWithHeaderAndSepWithoutSchemaProviderAndWithTransformer() throws Exception { + // The CSV files have header, the columns are separated by '\t' + // No schema provider is specified, transformer is applied + // In this case, the source schema comes from the inferred schema of the CSV files. + // Target schema is determined based on the Dataframe after transformation + testCsvDFSSource(true, '\t', false, TripsWithDistanceTransformer.class.getName()); + } + + @Test + public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception { + // The CSV files have header, the columns are separated by '\t' + // File schema provider is used, transformer is applied + // In this case, the source and target schema come from the Avro schema files + testCsvDFSSource(true, '\t', true, TripsWithDistanceTransformer.class.getName()); + } + + @Test + public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndNoTransformer() throws Exception { + // The CSV files do not have header, the columns are separated by '\t', + // which is passed in through the Hudi CSV properties + // No schema provider is specified, no transformer is applied + // In this case, the source schema comes from the inferred schema of the CSV files + // No CSV header and no schema provider at the same time are not recommended + // as the column names are not informative + testCsvDFSSource(false, '\t', false, null); + } + + @Test + public void testCsvDFSSourceNoHeaderWithSchemaProviderAndNoTransformer() throws Exception { + // The CSV files do not have header, the columns are separated by '\t' + // File schema provider is used, no transformer is applied + // In this case, the source schema comes from the source Avro schema file + testCsvDFSSource(false, '\t', true, null); + } + + @Test + public void testCsvDFSSourceNoHeaderWithoutSchemaProviderAndWithTransformer() throws Exception { + // The CSV files do not have header, the columns are separated by '\t' + // No schema provider is specified, transformer is applied + // In this case, the source schema comes from the inferred schema of the CSV files. + // Target schema is determined based on the Dataframe after transformation + // No CSV header and no schema provider at the same time are not recommended, + // as the transformer behavior may be unexpected + try { + testCsvDFSSource(false, '\t', false, TripsWithDistanceTransformer.class.getName()); + fail("Should error out when doing the transformation."); + } catch (AnalysisException e) { + LOG.error("Expected error during transformation", e); + assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:")); + } + } + + @Test + public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception { + // The CSV files do not have header, the columns are separated by '\t' + // File schema provider is used, transformer is applied + // In this case, the source and target schema come from the Avro schema files + testCsvDFSSource(false, '\t', true, TripsWithDistanceTransformer.class.getName()); + } + /** * UDF to calculate Haversine distance. */ diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java index 1fcd99ad1ec19..abf65786625f3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java @@ -27,11 +27,20 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.hive.util.HiveTestService; import org.apache.hudi.utilities.sources.TestDataSource; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder; import com.google.common.collect.ImmutableList; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -42,6 +51,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.server.HiveServer2; import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetFileWriter.Mode; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; @@ -56,6 +66,7 @@ import java.io.InputStreamReader; import java.io.PrintStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -72,6 +83,7 @@ public class UtilitiesTestBase { protected transient SparkSession sparkSession = null; protected transient SQLContext sqlContext; protected static HiveServer2 hiveServer; + private static ObjectMapper mapper = new ObjectMapper(); @BeforeClass public static void initClass() throws Exception { @@ -193,9 +205,47 @@ public static void saveStringsToDFS(String[] lines, FileSystem fs, String target os.close(); } + /** + * Converts the json records into CSV format and writes to a file. + * + * @param hasHeader whether the CSV file should have a header line. + * @param sep the column separator to use. + * @param lines the records in JSON format. + * @param fs {@link FileSystem} instance. + * @param targetPath File path. + * @throws IOException + */ + public static void saveCsvToDFS( + boolean hasHeader, char sep, + String[] lines, FileSystem fs, String targetPath) throws IOException { + Builder csvSchemaBuilder = CsvSchema.builder(); + + ArrayNode arrayNode = mapper.createArrayNode(); + Arrays.stream(lines).forEachOrdered( + line -> { + try { + arrayNode.add(mapper.readValue(line, ObjectNode.class)); + } catch (IOException e) { + throw new HoodieIOException( + "Error converting json records into CSV format: " + e.getMessage()); + } + }); + arrayNode.get(0).fieldNames().forEachRemaining(csvSchemaBuilder::addColumn); + ObjectWriter csvObjWriter = new CsvMapper() + .writerFor(JsonNode.class) + .with(csvSchemaBuilder.setUseHeader(hasHeader).setColumnSeparator(sep).build()); + PrintStream os = new PrintStream(fs.create(new Path(targetPath), true)); + csvObjWriter.writeValue(os, arrayNode); + os.flush(); + os.close(); + } + public static void saveParquetToDFS(List records, Path targetFile) throws IOException { try (ParquetWriter writer = AvroParquetWriter.builder(targetFile) - .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) { + .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA) + .withConf(HoodieTestUtils.getDefaultHadoopConf()) + .withWriteMode(Mode.OVERWRITE) + .build()) { for (GenericRecord record : records) { writer.write(record); } @@ -203,9 +253,13 @@ public static void saveParquetToDFS(List records, Path targetFile } public static TypedProperties setupSchemaOnDFS() throws IOException { - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); + return setupSchemaOnDFS("source.avsc"); + } + + public static TypedProperties setupSchemaOnDFS(String filename) throws IOException { + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/" + filename, dfs, dfsBasePath + "/" + filename); TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + filename); return props; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java index c8dc5d5319901..175edde652e63 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java @@ -123,7 +123,7 @@ protected static Stream fetchNextBatch(TypedProperties props, int updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates) .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); } - Stream insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts) + Stream insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts, false) .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream)); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java index 5815317d9281b..42cbebc5f8d43 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java @@ -56,6 +56,7 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase { String dfsRoot; String fileSuffix; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + boolean useFlattenedSchema = false; @BeforeClass public static void initClass() throws Exception { @@ -105,7 +106,7 @@ public void teardown() throws Exception { */ Path generateOneFile(String filename, String commitTime, int n) throws IOException { Path path = new Path(dfsRoot, filename + fileSuffix); - writeNewDataToFile(dataGenerator.generateInserts(commitTime, n), path); + writeNewDataToFile(dataGenerator.generateInserts(commitTime, n, useFlattenedSchema), path); return path; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java new file mode 100644 index 0000000000000..fbb6d8f54380d --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.utilities.UtilitiesTestBase; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; + +import org.apache.hadoop.fs.Path; +import org.junit.Before; + +import java.io.IOException; +import java.util.List; + +/** + * Basic tests for {@link CsvDFSSource}. + */ +public class TestCsvDFSSource extends AbstractDFSSourceTestBase { + + @Before + public void setup() throws Exception { + super.setup(); + this.dfsRoot = dfsBasePath + "/jsonFiles"; + this.fileSuffix = ".json"; + this.useFlattenedSchema = true; + this.schemaProvider = new FilebasedSchemaProvider( + Helpers.setupSchemaOnDFS("source-flattened.avsc"), jsc); + } + + @Override + Source prepareDFSSource() { + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + props.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(true)); + props.setProperty("hoodie.deltastreamer.csv.sep", "\t"); + return new CsvDFSSource(props, jsc, sparkSession, schemaProvider); + } + + @Override + void writeNewDataToFile(List records, Path path) throws IOException { + UtilitiesTestBase.Helpers.saveCsvToDFS( + true, '\t', Helpers.jsonifyRecords(records), dfs, path.toString()); + } +} diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source-flattened.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source-flattened.avsc new file mode 100644 index 0000000000000..ed3a7be358efa --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source-flattened.avsc @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "triprec", + "fields" : [ + { + "name" : "timestamp", + "type" : "double" + }, { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "begin_lat", + "type" : "double" + }, { + "name" : "begin_lon", + "type" : "double" + }, { + "name" : "end_lat", + "type" : "double" + }, { + "name" : "end_lon", + "type" : "double" + }, { + "name" : "fare", + "type" : "double" + }, { + "name" : "currency", + "type" : "string" + }, { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + } ] +} diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target-flattened.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target-flattened.avsc new file mode 100644 index 0000000000000..4e9e4afa37726 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target-flattened.avsc @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "triprec", + "fields" : [ + { + "name" : "timestamp", + "type" : "double" + }, { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "begin_lat", + "type" : "double" + }, { + "name" : "begin_lon", + "type" : "double" + }, { + "name" : "end_lat", + "type" : "double" + }, { + "name" : "end_lon", + "type" : "double" + }, { + "name" : "fare", + "type" : "double" + }, { + "name" : "currency", + "type" : "string" + }, { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + }, { + "name" : "haversine_distance", + "type" : "double" + }] +}