diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java new file mode 100644 index 0000000000000..6937a3389b1f4 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.HoodieJsonPayload; +import org.apache.hudi.common.config.DFSPropertiesConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.util.LongAccumulator; +import scala.Tuple2; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.Serializable; +import java.io.StringReader; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Loads data from Parquet Sources. + */ +public class HDFSParquetImporterUtils implements Serializable { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LogManager.getLogger(HDFSParquetImporterUtils.class); + private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd") + .withZone(ZoneId.systemDefault()); + + private final String command; + private final String srcPath; + private final String targetPath; + private final String tableName; + private final String tableType; + private final String rowKey; + private final String partitionKey; + private final int parallelism; + private final String schemaFile; + private int retry; + private final String propsFilePath; + private final List configs = new ArrayList<>(); + private TypedProperties props; + + public HDFSParquetImporterUtils( + String command, + String srcPath, + String targetPath, + String tableName, + String tableType, + String rowKey, + String partitionKey, + int parallelism, + String schemaFile, + int retry, + String propsFilePath) { + this.command = command; + this.srcPath = srcPath; + this.targetPath = targetPath; + this.tableName = tableName; + this.tableType = tableType; + this.rowKey = rowKey; + this.partitionKey = partitionKey; + this.parallelism = parallelism; + this.schemaFile = schemaFile; + this.retry = retry; + this.propsFilePath = propsFilePath; + } + + public boolean isUpsert() { + return "upsert".equalsIgnoreCase(this.command); + } + + public int dataImport(JavaSparkContext jsc) { + FileSystem fs = FSUtils.getFs(this.targetPath, jsc.hadoopConfiguration()); + this.props = this.propsFilePath == null || this.propsFilePath.isEmpty() ? buildProperties(this.configs) + : readConfig(fs.getConf(), new Path(this.propsFilePath), this.configs).getProps(true); + LOG.info("Starting data import with configs : " + props.toString()); + int ret = -1; + try { + // Verify that targetPath is not present. + if (fs.exists(new Path(this.targetPath)) && !isUpsert()) { + throw new HoodieIOException(String.format("Make sure %s is not present.", this.targetPath)); + } + do { + ret = dataImport(jsc, fs); + } while (ret != 0 && retry-- > 0); + } catch (Throwable t) { + LOG.error("dataImport failed", t); + } + return ret; + } + + public int dataImport(JavaSparkContext jsc, FileSystem fs) { + try { + if (fs.exists(new Path(this.targetPath)) && !isUpsert()) { + // cleanup target directory. + fs.delete(new Path(this.targetPath), true); + } + + if (!fs.exists(new Path(this.targetPath))) { + // Initialize target hoodie table. + Properties properties = HoodieTableMetaClient.withPropertyBuilder() + .setTableName(this.tableName) + .setTableType(this.tableType) + .build(); + HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), this.targetPath, properties); + } + + // Get schema. + String schemaStr = parseSchema(fs, this.schemaFile); + + SparkRDDWriteClient client = + createHoodieClient(jsc, this.targetPath, schemaStr, this.parallelism, Option.empty(), props); + + JavaRDD> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr); + // Get instant time. + String instantTime = client.startCommit(); + JavaRDD writeResponse = load(client, instantTime, hoodieRecords); + return handleErrors(jsc, instantTime, writeResponse); + } catch (Throwable t) { + LOG.error("Error occurred.", t); + } + return -1; + } + + public JavaRDD> buildHoodieRecordsForImport(JavaSparkContext jsc, + String schemaStr) throws IOException { + Job job = Job.getInstance(jsc.hadoopConfiguration()); + // Allow recursive directories to be found + job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true"); + // To parallelize reading file status. + job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024"); + AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr))); + ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); + + HoodieEngineContext context = new HoodieSparkEngineContext(jsc); + context.setJobStatus(this.getClass().getSimpleName(), "Build records for import: " + this.tableName); + return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, + job.getConfiguration()) + // To reduce large number of tasks. + .coalesce(16 * this.parallelism).map(entry -> { + GenericRecord genericRecord = ((Tuple2) entry)._2(); + Object partitionField = genericRecord.get(this.partitionKey); + if (partitionField == null) { + throw new HoodieIOException("partition key is missing. :" + this.partitionKey); + } + Object rowField = genericRecord.get(this.rowKey); + if (rowField == null) { + throw new HoodieIOException("row field is missing. :" + this.rowKey); + } + String partitionPath = partitionField.toString(); + LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); + if (partitionField instanceof Number) { + try { + long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L); + partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts)); + } catch (NumberFormatException nfe) { + LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); + } + } + return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), partitionPath), + new HoodieJsonPayload(genericRecord.toString())); + }); + } + + /** + * Imports records to Hoodie table. + * + * @param client Hoodie Client + * @param instantTime Instant Time + * @param hoodieRecords Hoodie Records + * @param Type + */ + public JavaRDD load(SparkRDDWriteClient client, String instantTime, + JavaRDD> hoodieRecords) { + switch (this.command.toLowerCase()) { + case "upsert": { + return client.upsert(hoodieRecords, instantTime); + } + case "bulkinsert": { + return client.bulkInsert(hoodieRecords, instantTime); + } + default: { + return client.insert(hoodieRecords, instantTime); + } + } + } + + public static TypedProperties buildProperties(List props) { + TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps(); + props.forEach(x -> { + String[] kv = x.split("="); + ValidationUtils.checkArgument(kv.length == 2); + properties.setProperty(kv[0], kv[1]); + }); + return properties; + } + + public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig, Path cfgPath, List overriddenProps) { + DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath); + try { + if (!overriddenProps.isEmpty()) { + LOG.info("Adding overridden properties to file properties."); + conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); + } + } catch (IOException ioe) { + throw new HoodieIOException("Unexpected error adding config overrides", ioe); + } + + return conf; + } + + /** + * Build Hoodie write client. + * + * @param jsc Java Spark Context + * @param basePath Base Path + * @param schemaStr Schema + * @param parallelism Parallelism + */ + public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, + int parallelism, Option compactionStrategyClass, TypedProperties properties) { + HoodieCompactionConfig compactionConfig = compactionStrategyClass + .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) + .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) + .orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build()); + HoodieWriteConfig config = + HoodieWriteConfig.newBuilder().withPath(basePath) + .withParallelism(parallelism, parallelism) + .withBulkInsertParallelism(parallelism) + .withDeleteParallelism(parallelism) + .withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withProps(properties).build(); + return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), config); + } + + /** + * Parse Schema from file. + * + * @param fs File System + * @param schemaFile Schema File + */ + public static String parseSchema(FileSystem fs, String schemaFile) throws Exception { + // Read schema file. + Path p = new Path(schemaFile); + if (!fs.exists(p)) { + throw new Exception(String.format("Could not find - %s - schema file.", schemaFile)); + } + long len = fs.getFileStatus(p).getLen(); + ByteBuffer buf = ByteBuffer.allocate((int) len); + try (FSDataInputStream inputStream = fs.open(p)) { + inputStream.readFully(0, buf.array(), 0, buf.array().length); + } + return new String(buf.array(), StandardCharsets.UTF_8); + } + + public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD writeResponse) { + LongAccumulator errors = jsc.sc().longAccumulator(); + writeResponse.foreach(writeStatus -> { + if (writeStatus.hasErrors()) { + errors.add(1); + LOG.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString())); + } + }); + if (errors.value() == 0) { + LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime)); + return 0; + } + LOG.error(String.format("Import failed with %d errors.", errors.value())); + return -1; + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala new file mode 100644 index 0000000000000..1589d230cecc9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.cli.HDFSParquetImporterUtils +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier +import scala.language.higherKinds + +class HdfsParquetImportProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "tableType", DataTypes.StringType, None), + ProcedureParameter.required(2, "srcPath", DataTypes.StringType, None), + ProcedureParameter.required(3, "targetPath", DataTypes.StringType, None), + ProcedureParameter.required(4, "rowKey", DataTypes.StringType, None), + ProcedureParameter.required(5, "partitionKey", DataTypes.StringType, None), + ProcedureParameter.required(6, "schemaFilePath", DataTypes.StringType, None), + ProcedureParameter.optional(7, "format", DataTypes.StringType, "parquet"), + ProcedureParameter.optional(8, "command", DataTypes.StringType, "insert"), + ProcedureParameter.optional(9, "retry", DataTypes.IntegerType, 0), + ProcedureParameter.optional(10, "parallelism", DataTypes.IntegerType, jsc.defaultParallelism), + ProcedureParameter.optional(11, "propsFilePath", DataTypes.StringType, "") + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("import_result", DataTypes.IntegerType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val tableType = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + val srcPath = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + val targetPath = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String] + val rowKey = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String] + val partitionKey = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[String] + val schemaFilePath = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[String] + val format = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[String] + val command = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[String] + val retry = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[Int] + val parallelism = getArgValueOrDefault(args, PARAMETERS(10)).get.asInstanceOf[Int] + val propsFilePath = getArgValueOrDefault(args, PARAMETERS(11)).get.asInstanceOf[String] + + val parquetImporterUtils: HDFSParquetImporterUtils = new HDFSParquetImporterUtils(command, srcPath, targetPath, + tableName, tableType, rowKey, partitionKey, parallelism, schemaFilePath, retry, propsFilePath) + + Seq(Row(parquetImporterUtils.dataImport(jsc))) + } + + override def build = new HdfsParquetImportProcedure() +} + +object HdfsParquetImportProcedure { + val NAME = "hdfs_parquet_import" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new HdfsParquetImportProcedure() + } +} + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 7f29f4b86f871..33ca211b03bd6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -54,6 +54,7 @@ object HoodieProcedures { mapBuilder.put(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder) mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder) mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder) + mapBuilder.put(HdfsParquetImportProcedure.NAME, HdfsParquetImportProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHdfsParquetImportProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHdfsParquetImportProcedure.scala new file mode 100644 index 0000000000000..90e6164c1001d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHdfsParquetImportProcedure.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils} +import org.apache.hudi.testutils.HoodieClientTestUtils +import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase +import org.junit.jupiter.api.Assertions.assertTrue + +import java.io.IOException +import java.util +import java.util.Objects +import java.util.concurrent.TimeUnit + +class TestHdfsParquetImportProcedure extends HoodieSparkSqlTestBase { + + test("Test Call hdfs_parquet_import Procedure with insert operation") { + withTempDir { tmp => + val fs: FileSystem = FSUtils.getFs(tmp.getCanonicalPath, spark.sparkContext.hadoopConfiguration) + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + Path.SEPARATOR + tableName + val sourcePath = new Path(tmp.getCanonicalPath, "source") + val targetPath = new Path(tablePath) + val schemaFile = new Path(tmp.getCanonicalPath, "file.schema").toString + + // create schema file + val schemaFileOS = fs.create(new Path(schemaFile)) + try schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes) + finally if (schemaFileOS != null) schemaFileOS.close() + + val insertData: util.List[GenericRecord] = createInsertRecords(sourcePath) + + // Check required fields + checkExceptionContain(s"""call hdfs_parquet_import(tableType => 'mor')""")( + s"Argument: table is required") + + checkAnswer( + s"""call hdfs_parquet_import( + |table => '$tableName', tableType => '${HoodieTableType.COPY_ON_WRITE.name}', + |srcPath => '$sourcePath', targetPath => '$targetPath', + |rowKey => '_row_key', partitionKey => 'timestamp', + |schemaFilePath => '$schemaFile')""".stripMargin) { + Seq(0) + } + + verifyResultData(insertData, fs, tablePath) + } + } + + test("Test Call hdfs_parquet_import Procedure with upsert operation") { + withTempDir { tmp => + val fs: FileSystem = FSUtils.getFs(tmp.getCanonicalPath, spark.sparkContext.hadoopConfiguration) + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + Path.SEPARATOR + tableName + val sourcePath = new Path(tmp.getCanonicalPath, "source") + val targetPath = new Path(tablePath) + val schemaFile = new Path(tmp.getCanonicalPath, "file.schema").toString + + // create schema file + val schemaFileOS = fs.create(new Path(schemaFile)) + try schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes) + finally if (schemaFileOS != null) schemaFileOS.close() + + val insertData: util.List[GenericRecord] = createUpsertRecords(sourcePath) + + // Check required fields + checkExceptionContain(s"""call hdfs_parquet_import(tableType => 'mor')""")( + s"Argument: table is required") + + checkAnswer( + s"""call hdfs_parquet_import( + |table => '$tableName', tableType => '${HoodieTableType.COPY_ON_WRITE.name}', + |srcPath => '$sourcePath', targetPath => '$targetPath', + |rowKey => '_row_key', partitionKey => 'timestamp', + |schemaFilePath => '$schemaFile', command => 'upsert')""".stripMargin) { + Seq(0) + } + + verifyResultData(insertData, fs, tablePath) + } + } + + @throws[ParseException] + @throws[IOException] + def createInsertRecords(srcFolder: Path): util.List[GenericRecord] = { + import scala.collection.JavaConversions._ + val srcFile: Path = new Path(srcFolder.toString, "file1.parquet") + val startTime: Long = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime / 1000 + val records: util.List[GenericRecord] = new util.ArrayList[GenericRecord] + for (recordNum <- 0 until 96) { + records.add(new HoodieTestDataGenerator().generateGenericRecord(recordNum.toString, + "0", "rider-" + recordNum, "driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))) + } + try { + val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](srcFile) + .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf).build + try { + for (record <- records) { + writer.write(record) + } + } finally { + if (writer != null) writer.close() + } + } + records + } + + @throws[ParseException] + @throws[IOException] + def createUpsertRecords(srcFolder: Path): util.List[GenericRecord] = { + import scala.collection.JavaConversions._ + val srcFile = new Path(srcFolder.toString, "file1.parquet") + val startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime / 1000 + val records = new util.ArrayList[GenericRecord] + // 10 for update + val dataGen = new HoodieTestDataGenerator + for (recordNum <- 0 until 11) { + records.add(dataGen.generateGenericRecord(recordNum.toString, "0", "rider-upsert-" + recordNum, "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))) + } + // 4 for insert + for (recordNum <- 96 until 100) { + records.add(dataGen.generateGenericRecord(recordNum.toString, "0", "rider-upsert-" + recordNum, "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))) + } + try { + val writer = AvroParquetWriter.builder[GenericRecord](srcFile).withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf).build + try { + for (record <- records) { + writer.write(record) + } + } finally { + if (writer != null) writer.close() + } + } + records + } + + private def verifyResultData(expectData: util.List[GenericRecord], fs: FileSystem, tablePath: String): Unit = { + import scala.collection.JavaConversions._ + val jsc = new JavaSparkContext(spark.sparkContext) + val ds = HoodieClientTestUtils.read(jsc, tablePath, spark.sqlContext, fs, tablePath + "/*/*/*/*") + val readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList() + val result = readData.toList.map((row: Row) => + new HoodieTripModel(row.getLong(0), row.getString(1), + row.getString(2), row.getString(3), row.getDouble(4), row.getDouble(5), row.getDouble(6), row.getDouble(7)) + ) + val expected = expectData.toList.map((g: GenericRecord) => new HoodieTripModel(Long.unbox(g.get("timestamp")), + g.get("_row_key").toString, g.get("rider").toString, g.get("driver").toString, g.get("begin_lat").toString.toDouble, + g.get("begin_lon").toString.toDouble, g.get("end_lat").toString.toDouble, g.get("end_lon").toString.toDouble)) + + assertTrue(expected.size == result.size || (result.containsAll(expected) && expected.containsAll(result))) + } + + class HoodieTripModel( + var timestamp: Long, + var rowKey: String, + var rider: String, + var driver: String, + var beginLat: Double, + var beginLon: Double, + var endLat: Double, + var endLon: Double) { + override def equals(o: Any): Boolean = { + if (this == o) { + true + } else if (o == null || (getClass ne o.getClass)) { + false + } else { + val other = o.asInstanceOf[HoodieTripModel] + timestamp == other.timestamp && rowKey == other.rowKey && rider == other.rider && + driver == other.driver && beginLat == other.beginLat && beginLon == other.beginLon && + endLat == other.endLat && endLon == other.endLon + } + } + + override def hashCode: Int = Objects.hashCode(timestamp, rowKey, rider, driver, beginLat, beginLon, endLat, endLon) + } +}