Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
Expand Down Expand Up @@ -98,6 +94,11 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;

import com.codahale.metrics.Timer;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -110,6 +111,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -208,16 +210,24 @@ public boolean commit(String instantTime, O writeStatuses, Option<Map<String, St
return commit(instantTime, writeStatuses, extraMetadata, actionType, Collections.emptyMap());
}

public boolean commit(String instantTime, O writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
return commit(instantTime, writeStatuses, extraMetadata, commitActionType, partitionToReplacedFileIds,
Option.empty());
}

public abstract boolean commit(String instantTime, O writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds);
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc);

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType) {
return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap());
return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty());
}

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
// Skip the empty commit if not allowed
if (!config.allowEmptyCommit() && stats.isEmpty()) {
return true;
Expand All @@ -233,6 +243,9 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(inflightInstant, metadata);
if (extraPreCommitFunc.isPresent()) {
extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
}
commit(table, commitActionType, instantTime, metadata, stats);
// already within lock, and so no lock requried for archival
postCommit(table, metadata, instantTime, extraMetadata, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -114,15 +115,17 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
}

@Override
public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
// for eager flush, multiple write stat may share one file path.
List<HoodieWriteStat> merged = writeStats.stream()
.collect(Collectors.groupingBy(writeStat -> writeStat.getPartitionPath() + writeStat.getPath()))
.values().stream()
.map(duplicates -> duplicates.stream().reduce(WriteStatMerger::merge).get())
.collect(Collectors.toList());
return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds);
return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
Expand Down Expand Up @@ -82,9 +83,11 @@ public boolean commit(String instantTime,
List<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata,
String commitActionType,
Map<String, List<String>> partitionToReplacedFileIds) {
Map<String, List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
Expand All @@ -71,6 +71,7 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

@SuppressWarnings("checkstyle:LineLength")
Expand Down Expand Up @@ -120,10 +121,11 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
*/
@Override
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName());
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,16 @@ object DataSourceWriteOptions {
+ " within a streaming microbatch. Turning this on, could hide the write status errors while the spark checkpoint moves ahead." +
"So, would recommend users to use this with caution.")

val STREAMING_CHECKPOINT_IDENTIFIER: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.checkpoint.identifier")
.noDefaultValue()
.sinceVersion("0.13.0")
.withDocumentation("A stream identifier used for HUDI to fetch the right checkpoint(`batch id` to be more specific) "
+ "corresponding this writer. Please note that keep the identifier an unique value for different writer "
+ "if under multi-writer scenario. If the value is not set, will only keep the checkpoint info in the memory. "
+ "This could introduce the potential issue that the job is restart(`batch id` is lost) while spark checkpoint write fails, "
+ "causing spark will retry and rewrite the data.")

val META_SYNC_CLIENT_TOOL_CLASS_NAME: ConfigProperty[String] = ConfigProperty
.key("hoodie.meta.sync.client.tool.class")
.defaultValue(classOf[HiveSyncTool].getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SPARK_VERSION, SparkContext}

import java.util.function.BiConsumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
Expand All @@ -81,7 +82,8 @@ object HoodieSparkSqlWriter {
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty)
asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty)
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {

Expand Down Expand Up @@ -183,7 +185,7 @@ object HoodieSparkSqlWriter {
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
}
}
tableConfig = tableMetaClient.getTableConfig

val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)
Expand All @@ -203,14 +205,14 @@ object HoodieSparkSqlWriter {

val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace)
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField)))
} else {
None
}
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField)))
} else {
None
}
}

// NOTE: Target writer's schema is deduced based on
Expand Down Expand Up @@ -378,7 +380,7 @@ object HoodieSparkSqlWriter {
val (writeSuccessful, compactionInstant, clusteringInstant) =
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation))
TableInstantInfo(basePath, instantTime, commitActionType, operation), extraPreCommitFn)

(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
}
Expand Down Expand Up @@ -567,7 +569,7 @@ object HoodieSparkSqlWriter {
def getLatestTableInternalSchema(config: HoodieConfig,
tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = {
if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Option.empty[InternalSchema]
Option.empty[InternalSchema]
} else {
try {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
Expand Down Expand Up @@ -885,7 +887,8 @@ object HoodieSparkSqlWriter {
client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
jsc: JavaSparkContext,
tableInstantInfo: TableInstantInfo
tableInstantInfo: TableInstantInfo,
extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]]
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 0) {
log.info("Proceeding to commit the write.")
Expand All @@ -895,7 +898,8 @@ object HoodieSparkSqlWriter {
client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses,
common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap))),
tableInstantInfo.commitActionType,
writeResult.getPartitionToReplaceFileIds)
writeResult.getPartitionToReplaceFileIds,
common.util.Option.ofNullable(extraPreCommitFn.orNull))

if (commitSuccess) {
log.info("Commit " + tableInstantInfo.instantTime + " successful!")
Expand Down Expand Up @@ -981,7 +985,7 @@ object HoodieSparkSqlWriter {
}

private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = {
tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = {
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams)
val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
Expand Down
Loading