Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "lowest and best effort file sizing. "
+ "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads");

public static final ConfigProperty<String> BULK_INSERT_WRITE_STREAM_ENABLE = ConfigProperty
.key("hoodie.bulkinsert.write.stream")
.defaultValue("false")
.withDocumentation("Enable this config to do bulk insert with `writeStream` dataset using row-writer path, instead of converting to RDD");

public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_ENABLE = ConfigProperty
.key("hoodie.embed.timeline.server")
.defaultValue("true")
Expand Down Expand Up @@ -1126,6 +1131,10 @@ public BulkInsertSortMode getBulkInsertSortMode() {
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
}

public boolean isBulkInsertWriteStreamEnabled() {
return getBoolean(BULK_INSERT_WRITE_STREAM_ENABLE);
}

public boolean isMergeDataValidationCheckEnabled() {
return getBoolean(MERGE_DATA_VALIDATION_CHECK_ENABLE);
}
Expand Down Expand Up @@ -2439,6 +2448,11 @@ public Builder withBulkInsertSortMode(String mode) {
return this;
}

public Builder withBulkInsertWriteStreamEnabled(boolean bulkInsertWriteStreamEnabled) {
writeConfig.setValue(BULK_INSERT_WRITE_STREAM_ENABLE, String.valueOf(bulkInsertWriteStreamEnabled));
return this;
}

public Builder withAllowMultiWriteOnSameInstant(boolean allow) {
writeConfig.setValue(ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE, String.valueOf(allow));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void deleteInstantFileIfExists(HoodieInstant instant) {
}
}

protected void deleteInstantFile(HoodieInstant instant) {
public void deleteInstantFile(HoodieInstant instant) {
LOG.info("Deleting instant " + instant);
Path inFlightCommitFilePath = getInstantFileNamePath(instant.getFileName());
try {
Expand All @@ -264,6 +264,14 @@ public Option<byte[]> getInstantDetails(HoodieInstant instant) {
return readDataFromPath(detailPath);
}

public HoodieInstant getCompletedInstantForTimestamp(String timestamp) {
return filterCompletedInstants()
.getInstants()
.filter(i -> timestamp.equals(i.getTimestamp()))
.findFirst()
.orElseThrow(() -> new HoodieIOException(String.format("No completed instant with timestamp %s ", timestamp)));
}

/**
* Returns most recent instant having valid schema in its {@link HoodieCommitMetadata}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.package$;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -186,4 +191,28 @@ public static Dataset<Row> prepareHoodieDatasetForBulkInsertWithoutMetaFields(Da
JavaConverters.collectionAsScalaIterableConverter(allCols).asScala().toSeq());
}

public static List<InternalRow> toInternalRows(Dataset<Row> rows) throws Exception {
ExpressionEncoder encoder = SparkRowWriteHelper.getEncoder(rows.schema());
List<InternalRow> toReturn = new ArrayList<>();
List<Row> rowList = rows.collectAsList();
for (Row row : rowList) {
toReturn.add(serializeRow(encoder, row).copy());
}
return toReturn;
}

private static InternalRow serializeRow(ExpressionEncoder encoder, Row row)
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, ClassNotFoundException {
// TODO remove reflection if Spark 2.x support is dropped
if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) {
Method spark2method = encoder.getClass().getMethod("toRow", Object.class);
return (InternalRow) spark2method.invoke(encoder, row);
} else {
Class<?> serializerClass = Class.forName("org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer");
Object serializer = encoder.getClass().getMethod("createSerializer").invoke(encoder);
Method aboveSpark2method = serializerClass.getMethod("apply", Object.class);
return (InternalRow) aboveSpark2method.invoke(serializer, row);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi;

import org.apache.hudi.common.model.HoodieRecord;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
Expand All @@ -29,13 +30,14 @@
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;

import java.util.List;
import java.util.stream.Collectors;

import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;

/**
* Helper class to assist in deduplicating Rows for BulkInsert with Rows.
*/
Expand All @@ -62,7 +64,7 @@ public Dataset<Row> deduplicateRows(Dataset<Row> inputDf, String preCombineField
.map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, getEncoder(inputDf.schema()));
}

private ExpressionEncoder getEncoder(StructType schema) {
static ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriterFactory
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
Expand Down Expand Up @@ -177,7 +178,19 @@ object HoodieSparkSqlWriter {
operation == WriteOperationType.BULK_INSERT) {
val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
basePath, path, instantTime, partitionColumns)
return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)

// Check for errors and commit the write.
/*val (writeSuccessful, compactionInstant, clusteringInstant) =
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation))

(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)*/

return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]], tableConfig)
}
// scalastyle:on

Expand Down Expand Up @@ -563,11 +576,51 @@ object HoodieSparkSqlWriter {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
}
if (HoodieSparkUtils.isSpark2) {
hoodieDF.write.format("org.apache.hudi.internal")
if (writeConfig.isBulkInsertWriteStreamEnabled) {
// init internal writer HoodieDataSourceInternalWriter
val internalRows = HoodieDatasetBulkInsertHelper.toInternalRows(hoodieDF)
// val writer = HoodieInternalRowFileWriterFactory.getInternalRowFileWriterWithoutMetaFields(basePath,)
val writerHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, hoodieDF.schema,
sqlContext.sparkSession, sparkContext.hadoopConfiguration, DataSourceUtils.getExtraMetadata(params))

val hoodieTable = writerHelper.getHoodieTable
val writer = HoodieInternalRowFileWriterFactory.getInternalRowFileWriterWithoutMetaFields(
basePath,
hoodieTable,
writeConfig,
hoodieDF.schema)
/*val internalWriter = new HoodieDataSourceInternalWriter(
instantTime,
writeConfig,
hoodieDF.schema,
sqlContext.sparkSession,
sparkContext.hadoopConfiguration,
DataSourceUtils.getExtraMetadata(params),
populateMetaFields,
arePartitionRecordsSorted)
internalRows.foreach(row => internalWriter.createWriterFactory()
.createDataWriter(0, Random.nextLong, Random.nextLong)
.write(row))*/
internalRows.foreach(row => writer.writeRow(row))

/*val query = hoodieDF.writeStream.trigger(Trigger.ProcessingTime(2000))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format("org.apache.hudi")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.options(params)
.mode(SaveMode.Append)
.save()
batchDF.unpersist()
}.start()
query.awaitTermination(2000L)*/
} else {
hoodieDF.write.format("org.apache.hudi.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.options(params)
.mode(SaveMode.Append)
.save()
}
} else if (HoodieSparkUtils.isSpark3) {
hoodieDF.write.format("org.apache.hudi.spark3.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
Expand All @@ -583,6 +636,63 @@ object HoodieSparkSqlWriter {
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

def bulkInsertStreamAsRow(sqlContext: SQLContext,
parameters: Map[String, String],
df: DataFrame,
tblName: String,
path: String,
partitionColumns: String): Dataset[Row] = {
val sparkContext = sqlContext.sparkContext
val populateMetaFields = java.lang.Boolean.parseBoolean((parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))))
val dropPartitionColumns = parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean)
.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
if (dropPartitionColumns) {
schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
}
validateSchemaForHoodieIsDeleted(schema)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
}
val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*)
params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
userDefinedBulkInsertPartitionerOpt.get
}
else {
BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode)
}
} else {
// Sort modes are not yet supported when meta fields are disabled
new NonSortPartitionerWithRows()
}
val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted()
params(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED) = arePartitionRecordsSorted.toString
val isGlobalIndex = if (populateMetaFields) {
SparkHoodieIndexFactory.isGlobalIndex(writeConfig)
} else {
false
}
val hoodieDF = if (populateMetaFields) {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace,
bulkInsertPartitionerRows, isGlobalIndex, dropPartitionColumns)
} else {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
}
hoodieDF
}

private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
Expand Down
Loading