Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ This product includes code from Apache Spark

* org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package

* org.apache.hudi.HoodieSparkUtils.scala copied some methods from org.apache.spark.deploy.SparkHadoopUtil.scala

Copyright: 2014 and onwards The Apache Software Foundation
Home page: http://spark.apache.org/
License: http://www.apache.org/licenses/LICENSE-2.0
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ The default Scala version supported is 2.11. To build for Scala 2.12 version, bu
mvn clean package -DskipTests -Dscala-2.12
```

### Build with Spark 3.0.0

The default Spark version supported is 2.4.4. To build for Spark 3.0.0 version, build using `spark3` profile

```
mvn clean package -DskipTests -Dspark3
```

### Build without spark-avro module

The default hudi-jar bundles spark-avro module. To build without spark-avro module, build using `spark-shade-unbundle-avro` profile
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.utils;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;

import java.io.Serializable;

public interface SparkRowDeserializer extends Serializable {
Row deserializeRow(InternalRow internalRow);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,15 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.HoodieKey
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConverters._

object AvroConversionUtils {

def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema, structName, recordNamespace)
}

def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
: RDD[GenericRecord] = {
// Use the Avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(dataType).resolveAndBind()
df.queryExecution.toRdd.map(encoder.fromRow)
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}
}

def createRddForDeletes(df: DataFrame, rowField: String, partitionField: String): RDD[HoodieKey] = {
df.rdd.map(row => new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField)))
}

def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = {
if (rdd.isEmpty()) {
ss.emptyDataFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void tearDown() throws Exception {
}

@Test
public void testRowCreateHandle() throws IOException {
public void testRowCreateHandle() throws Exception {
// init config and table
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build();
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
Expand Down Expand Up @@ -113,7 +113,7 @@ public void testRowCreateHandle() throws IOException {
* should be thrown.
*/
@Test
public void testGlobalFailure() throws IOException {
public void testGlobalFailure() throws Exception {
// init config and table
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build();
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
Expand Down Expand Up @@ -179,7 +179,8 @@ public void testInstantiationFailure() throws IOException {
}
}

private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset<Row> inputRows, HoodieRowCreateHandle handle) throws IOException {
private HoodieInternalWriteStatus writeAndGetWriteStatus(Dataset<Row> inputRows, HoodieRowCreateHandle handle)
throws Exception {
List<InternalRow> internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER);
// issue writes
for (InternalRow internalRow : internalRows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -64,7 +63,7 @@ public void tearDown() throws Exception {
}

@Test
public void endToEndTest() throws IOException {
public void endToEndTest() throws Exception {
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build();
for (int i = 0; i < 5; i++) {
// init write support and parquet config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -84,36 +83,32 @@ public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf,
.map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal()))
.collect(Collectors.toList()));

return inputPaths.stream().map(path -> {
setInputPath(jobConf, path);
List<GenericRecord> records = new ArrayList<>();
try {
List<InputSplit> splits = Arrays.asList(inputFormat.getSplits(jobConf, 1));
for (InputSplit split : splits) {
RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null);
Object key = recordReader.createKey();
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
while (recordReader.next(key, writable)) {
GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema);
// writable returns an array with [field1, field2, _hoodie_commit_time,
// _hoodie_commit_seqno]
Writable[] values = writable.get();
schema.getFields().stream()
.filter(f -> !projectCols || projectedColumns.contains(f.name()))
.map(f -> Pair.of(projectedSchema.getFields().stream()
.filter(p -> f.name().equals(p.name())).findFirst().get(), f))
.forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), values[fieldsPair.getValue().pos()]));
records.add(newRecord.build());
}
List<GenericRecord> records = new ArrayList<>();
try {
FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size());

for (InputSplit split : splits) {
RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null);
Object key = recordReader.createKey();
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
while (recordReader.next(key, writable)) {
GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema);
// writable returns an array with [field1, field2, _hoodie_commit_time,
// _hoodie_commit_seqno]
Writable[] values = writable.get();
schema.getFields().stream()
.filter(f -> !projectCols || projectedColumns.contains(f.name()))
.map(f -> Pair.of(projectedSchema.getFields().stream()
.filter(p -> f.name().equals(p.name())).findFirst().get(), f))
.forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), values[fieldsPair.getValue().pos()]));
records.add(newRecord.build());
}
} catch (IOException ie) {
ie.printStackTrace();
}
return records;
}).reduce((a, b) -> {
a.addAll(b);
return a;
}).orElse(new ArrayList<>());
} catch (IOException ie) {
ie.printStackTrace();
}
return records;
}

private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, List<String> projectedCols) {
Expand Down Expand Up @@ -156,10 +151,4 @@ private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf
configurable.setConf(conf);
jobConf.addResource(conf);
}

private static void setInputPath(JobConf jobConf, String inputPath) {
jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
jobConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
jobConf.set("map.input.dir", inputPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;

import org.apache.spark.package$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand All @@ -41,6 +42,8 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -139,11 +142,11 @@ public static Row getRandomValue(String partitionPath, boolean isError) {
* @param rows Dataset<Row>s to be converted
* @return the List of {@link InternalRow}s thus converted.
*/
public static List<InternalRow> toInternalRows(Dataset<Row> rows, ExpressionEncoder encoder) {
public static List<InternalRow> toInternalRows(Dataset<Row> rows, ExpressionEncoder encoder) throws Exception {
List<InternalRow> toReturn = new ArrayList<>();
List<Row> rowList = rows.collectAsList();
for (Row row : rowList) {
toReturn.add(encoder.toRow(row).copy());
toReturn.add(serializeRow(encoder, row).copy());
}
return toReturn;
}
Expand Down Expand Up @@ -173,4 +176,17 @@ public static HoodieWriteConfig.Builder getConfigBuilder(String basePath) {
.withBulkInsertParallelism(2);
}

private static InternalRow serializeRow(ExpressionEncoder encoder, Row row)
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, ClassNotFoundException {
// TODO remove reflection if Spark 2.x support is dropped
if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) {
Method spark2method = encoder.getClass().getMethod("toRow", Object.class);
return (InternalRow) spark2method.invoke(encoder, row);
} else {
Class<?> serializerClass = Class.forName("org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer");
Object serializer = encoder.getClass().getMethod("createSerializer").invoke(encoder);
Method aboveSpark2method = serializerClass.getMethod("apply", Object.class);
return (InternalRow) aboveSpark2method.invoke(serializer, row);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame
break;
}
String content = response.returnContent().asString();
return mapper.readValue(content, reference);
return (T) mapper.readValue(content, reference);
}

private Map<String, String> getParamsWithPartitionPath(String partitionPath) {
Expand Down
8 changes: 1 addition & 7 deletions hudi-integ-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,11 @@
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.7.4</version>
<version>${fasterxml.jackson.dataformat.yaml.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.7.3</version>
</dependency>

<!-- Fasterxml - Test-->
Expand All @@ -220,11 +219,6 @@
<artifactId>jackson-annotations</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -49,7 +49,7 @@ public static JavaRDD<GenericRecord> readAvro(SparkSession sparkSession, String
.option(AVRO_SCHEMA_OPTION_KEY, schemaStr)
.load(JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq());

return AvroConversionUtils
return HoodieSparkUtils
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE))
.toJavaRDD();
Expand All @@ -61,7 +61,7 @@ public static JavaRDD<GenericRecord> readParquet(SparkSession sparkSession, List
Dataset<Row> dataSet = sparkSession.read()
.parquet((JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq()));

return AvroConversionUtils
return HoodieSparkUtils
.createRdd(dataSet.toDF(), structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE)
.toJavaRDD();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public abstract class ITTestBase {
protected static final String HIVESERVER = "/hiveserver";
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh";
protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_generate_app.sh";
protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_streaming_app.sh";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh";
protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh";
protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_streaming_app.sh";
protected static final String HUDI_HADOOP_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar";
protected static final String HUDI_HIVE_SYNC_BUNDLE =
Expand Down
Loading