Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,30 @@ public static SparkHoodieIndex createIndex(HoodieWriteConfig config) {
}
}

/**
* Whether index is global or not.
* @param config HoodieWriteConfig to use.
* @return {@code true} if index is a global one. else {@code false}.
*/
public static boolean isGlobalIndex(HoodieWriteConfig config) {
switch (config.getIndexType()) {
case HBASE:
return true;
case INMEMORY:
return true;
case BLOOM:
return false;
case GLOBAL_BLOOM:
return true;
case SIMPLE:
return false;
case GLOBAL_SIMPLE:
return true;
default:
return createIndex(config).isGlobal();
}
}

@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

package org.apache.hudi;

import static org.apache.spark.sql.functions.callUDF;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ReflectionUtils;
Expand All @@ -35,16 +41,8 @@
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import scala.collection.JavaConverters;

import static org.apache.spark.sql.functions.callUDF;

/**
* Helper class to assist in preparing {@link Dataset<Row>}s for bulk insert with datasource implementation.
*/
Expand All @@ -69,7 +67,8 @@ public class HoodieDatasetBulkInsertHelper {
*/
public static Dataset<Row> prepareHoodieDatasetForBulkInsert(SQLContext sqlContext,
HoodieWriteConfig config, Dataset<Row> rows, String structName, String recordNamespace,
BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows) {
BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows,
boolean isGlobalIndex) {
List<Column> originalFields =
Arrays.stream(rows.schema().fields()).map(f -> new Column(f.name())).collect(Collectors.toList());

Expand Down Expand Up @@ -100,9 +99,15 @@ public static Dataset<Row> prepareHoodieDatasetForBulkInsert(SQLContext sqlConte
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.FILENAME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType));

Dataset<Row> dedupedDf = rowDatasetWithHoodieColumns;
if (config.shouldCombineBeforeInsert()) {
dedupedDf = SparkRowWriteHelper.newInstance().deduplicateRows(rowDatasetWithHoodieColumns, config.getPreCombineField(), isGlobalIndex);
}

List<Column> orderedFields = Stream.concat(HoodieRecord.HOODIE_META_COLUMNS.stream().map(Column::new),
originalFields.stream()).collect(Collectors.toList());
Dataset<Row> colOrderedDataset = rowDatasetWithHoodieColumns.select(
Dataset<Row> colOrderedDataset = dedupedDf.select(
JavaConverters.collectionAsScalaIterableConverter(orderedFields).asScala().toSeq());

return bulkInsertPartitionerRows.repartitionRecords(colOrderedDataset, config.getBulkInsertShuffleParallelism());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi;

import org.apache.hudi.common.model.HoodieRecord;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;

import java.util.List;
import java.util.stream.Collectors;

import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;

/**
* Helper class to assist in deduplicating Rows for BulkInsert with Rows.
*/
public class SparkRowWriteHelper {

private SparkRowWriteHelper() {
}

private static class WriteHelperHolder {
private static final SparkRowWriteHelper SPARK_WRITE_HELPER = new SparkRowWriteHelper();
}

public static SparkRowWriteHelper newInstance() {
return SparkRowWriteHelper.WriteHelperHolder.SPARK_WRITE_HELPER;
}

public Dataset<Row> deduplicateRows(Dataset<Row> inputDf, String preCombineField, boolean isGlobalIndex) {
ExpressionEncoder encoder = getEncoder(inputDf.schema());

return inputDf.groupByKey(
(MapFunction<Row, String>) value ->
isGlobalIndex ? (value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)) :
(value.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + value.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)), Encoders.STRING())
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
if (((Comparable) v1.getAs(preCombineField)).compareTo(((Comparable) v2.getAs(preCombineField))) >= 0) {
return v1;
} else {
return v2;
}
}
).map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, encoder);
}

private ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
SimpleAnalyzer$.MODULE$);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.index.SparkHoodieIndex
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
Expand Down Expand Up @@ -345,8 +346,9 @@ object HoodieSparkSqlWriter {
}
val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted();
parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, arePartitionRecordsSorted.toString)
val isGlobalIndex = SparkHoodieIndex.isGlobalIndex(writeConfig)
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace,
bulkInsertPartitionerRows)
bulkInsertPartitionerRows, isGlobalIndex)
if (SPARK_VERSION.startsWith("2.")) {
hoodieDF.write.format("org.apache.hudi.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,32 @@
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.apache.avro.Schema;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -45,26 +62,35 @@
public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {

private String schemaStr;
private Schema schema;
private transient Schema schema;
private StructType structType;

public TestHoodieDatasetBulkInsertHelper() throws IOException {
init();
}

/**
* args for schema evolution test.
*/
private static Stream<Arguments> providePreCombineArgs() {
return Stream.of(
Arguments.of(false),
Arguments.of(true));
}

private void init() throws IOException {
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = DataSourceTestUtils.getStructTypeExampleSchema();
structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
}

@Test
public void testBulkInsertHelper() throws IOException {
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).build();
public void testBulkInsertHelper() {
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).combineInput(false, false).build();
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", "testNamespace",
new NonSortPartitionerWithRows());
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
"testNamespace", new NonSortPartitionerWithRows(), false);
StructType resultSchema = result.schema();

assertEquals(result.count(), 10);
Expand All @@ -74,6 +100,42 @@ public void testBulkInsertHelper() throws IOException {
assertTrue(resultSchema.fieldIndex(entry.getKey()) == entry.getValue());
}

result.toJavaRDD().foreach(entry -> {
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs("_row_key")));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(entry.getAs("partition")));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)).equals(""));
});

Dataset<Row> trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
assertTrue(dataset.except(trimmedOutput).count() == 0);
}

@ParameterizedTest
@MethodSource("providePreCombineArgs")
public void testBulkInsertPreCombine(boolean enablePreCombine) {
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet()).combineInput(enablePreCombine, enablePreCombine)
.withPreCombineField("ts").build();
List<Row> inserts = DataSourceTestUtils.generateRandomRows(10);
Dataset<Row> toUpdateDataset = sqlContext.createDataFrame(inserts.subList(0, 5), structType);
List<Row> updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset);
List<Row> rows = new ArrayList<>();
rows.addAll(inserts);
rows.addAll(updates);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
"testNamespace", new NonSortPartitionerWithRows(), false);
StructType resultSchema = result.schema();

assertEquals(result.count(), enablePreCombine ? 10 : 15);
assertEquals(resultSchema.fieldNames().length, structType.fieldNames().length + HoodieRecord.HOODIE_META_COLUMNS.size());

for (Map.Entry<String, Integer> entry : HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.entrySet()) {
assertTrue(resultSchema.fieldIndex(entry.getKey()) == entry.getValue());
}

int metadataRecordKeyIndex = resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD);
int metadataParitionPathIndex = resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
int metadataCommitTimeIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
Expand All @@ -87,6 +149,30 @@ public void testBulkInsertHelper() throws IOException {
assertTrue(entry.get(metadataCommitTimeIndex).equals(""));
assertTrue(entry.get(metadataFilenameIndex).equals(""));
});

Dataset<Row> trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);

// find resolved input snapshot
ExpressionEncoder encoder = getEncoder(dataset.schema());
if (enablePreCombine) {
Dataset<Row> inputSnapshotDf = dataset.groupByKey(
(MapFunction<Row, String>) value -> value.getAs("partition") + "+" + value.getAs("_row_key"), Encoders.STRING())
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
long ts1 = v1.getAs("ts");
long ts2 = v2.getAs("ts");
if (ts1 >= ts2) {
return v1;
} else {
return v2;
}
})
.map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, encoder);

assertTrue(inputSnapshotDf.except(trimmedOutput).count() == 0);
} else {
assertTrue(dataset.except(trimmedOutput).count() == 0);
}
}

private Map<String, String> getPropsAllSet() {
Expand Down Expand Up @@ -120,7 +206,7 @@ public void testNoPropsSet() {
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
try {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
"testNamespace", new NonSortPartitionerWithRows());
"testNamespace", new NonSortPartitionerWithRows(), false);
fail("Should have thrown exception");
} catch (Exception e) {
// ignore
Expand All @@ -131,7 +217,7 @@ public void testNoPropsSet() {
dataset = sqlContext.createDataFrame(rows, structType);
try {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
"testNamespace", new NonSortPartitionerWithRows());
"testNamespace", new NonSortPartitionerWithRows(), false);
fail("Should have thrown exception");
} catch (Exception e) {
// ignore
Expand All @@ -142,7 +228,7 @@ public void testNoPropsSet() {
dataset = sqlContext.createDataFrame(rows, structType);
try {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
"testNamespace", new NonSortPartitionerWithRows());
"testNamespace", new NonSortPartitionerWithRows(), false);
fail("Should have thrown exception");
} catch (Exception e) {
// ignore
Expand All @@ -153,10 +239,18 @@ public void testNoPropsSet() {
dataset = sqlContext.createDataFrame(rows, structType);
try {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName",
"testNamespace", new NonSortPartitionerWithRows());
"testNamespace", new NonSortPartitionerWithRows(), false);
fail("Should have thrown exception");
} catch (Exception e) {
// ignore
}
}

private ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
SimpleAnalyzer$.MODULE$);
}
}
Loading