Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ object DataSourceWriteOptions {
val HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "hoodie.datasource.hive_sync.ignore_exceptions"
val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp"
val HIVE_TABLE_PROPERTIES = "hoodie.datasource.hive_sync.table_properties"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets introduce another additional boolean property hoodie.datasource.hive_sync.sync_as_datasource and put the feature behind it. We can use true by default, but atleast there would be a way to turn it off. This is going to change the way spark sql queries currently run with Hudi, and is a huge change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! +1 for this!

val HIVE_TABLE_SERDE_PROPERTIES = "hoodie.datasource.hive_sync.serde_properties"
val HIVE_SYNC_AS_DATA_SOURCE_TABLE = "hoodie.datasource.hive_sync.sync_as_datasource"

// DEFAULT FOR HIVE SPECIFIC CONFIGS
val DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL = "false"
Expand All @@ -372,6 +375,7 @@ object DataSourceWriteOptions {
val DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "false"
val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false"
val DEFAULT_HIVE_SYNC_AS_DATA_SOURCE_TABLE = "true"

// Async Compaction - Enabled by default for MOR
val ASYNC_COMPACT_ENABLE_OPT_KEY = "hoodie.datasource.compaction.async.enable"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.sync.common.AbstractSyncTool
Expand All @@ -44,7 +45,10 @@ import org.apache.spark.SPARK_VERSION
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -220,7 +224,8 @@ private[hudi] object HoodieSparkSqlWriter {

// Check for errors and commit the write.
val (writeSuccessful, compactionInstant) =
commitAndPerformPostOperations(writeResult, parameters, writeClient, tableConfig, jsc,
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation))

def unpersistRdd(rdd: RDD[_]): Unit = {
Expand Down Expand Up @@ -305,7 +310,7 @@ private[hudi] object HoodieSparkSqlWriter {
} finally {
writeClient.close()
}
val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration)
val metaSyncSuccess = metaSync(sqlContext.sparkSession, parameters, basePath, df.schema)
metaSyncSuccess
}

Expand Down Expand Up @@ -346,12 +351,13 @@ private[hudi] object HoodieSparkSqlWriter {
}
val hiveSyncEnabled = params.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val metaSyncEnabled = params.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled || metaSyncEnabled) {
metaSync(params, basePath, sqlContext.sparkContext.hadoopConfiguration)
val syncHiveSuccess =
if (hiveSyncEnabled || metaSyncEnabled) {
metaSync(sqlContext.sparkSession, parameters, basePath, df.schema)
} else {
true
}
(syncHiveSucess, common.util.Option.ofNullable(instantTime))
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

def toProperties(params: Map[String, String]): TypedProperties = {
Expand Down Expand Up @@ -398,7 +404,7 @@ private[hudi] object HoodieSparkSqlWriter {
private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
hiveSyncConfig.basePath = basePath.toString
hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY);
hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY)
hiveSyncConfig.usePreApacheInputFormat =
parameters.get(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).exists(r => r.toBoolean)
hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY)
Expand All @@ -417,35 +423,95 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig.autoCreateDatabase = parameters.get(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).exists(r => r.toBoolean)
hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean

val syncAsDtaSourceTable = parameters.getOrElse(DataSourceWriteOptions.HIVE_SYNC_AS_DATA_SOURCE_TABLE,
DataSourceWriteOptions.DEFAULT_HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
if (syncAsDtaSourceTable) {
hiveSyncConfig.tableProperties = parameters.getOrElse(HIVE_TABLE_PROPERTIES, null)
hiveSyncConfig.serdeProperties = createSqlTableSerdeProperties(parameters, basePath.toString)
}
hiveSyncConfig
}

private def metaSync(parameters: Map[String, String],
basePath: Path,
hadoopConf: Configuration): Boolean = {
/**
* Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES.
* @param sqlConf The spark sql conf.
* @param schema The schema to write to the table.
* @param parameters The origin parameters.
* @return A new parameters added the HIVE_TABLE_PROPERTIES property.
*/
private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType,
parameters: Map[String, String]): Map[String, String] = {
// Convert the schema and partition info used by spark sql to hive table properties.
// The following code refers to the spark code in
// https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

val partitionSet = parameters(HIVE_PARTITION_FIELDS_OPT_KEY)
.split(",").map(_.trim).filter(!_.isEmpty).toSet
val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD)

val (partitionCols, dataCols) = schema.partition(c => partitionSet.contains(c.name))
val reOrderedType = StructType(dataCols ++ partitionCols)
val schemaParts = reOrderedType.json.grouped(threshold).toSeq

var properties = Map(
"spark.sql.sources.provider" -> "hudi",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pengzhiwei2018 hello, how spark sql judge to use datasource ? I see just set the table properties . But not the "ROW FORMAT SERDE "

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lw309637554 , If the table properties contains the spark.sql.sources.provider and schema/partition properties, spark will trait it as a datasource table.
The PR has set the serde properties in createSqlTableSerdeProperties, that also will be used by spark datasource table.

"spark.sql.sources.schema.numParts" -> schemaParts.size.toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can persist this properties to metatable , not the hive table properties?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark need this properties when load the meta data from the hive meta store.So we should store them there.

)
schemaParts.zipWithIndex.foreach { case (part, index) =>
properties += s"spark.sql.sources.schema.part.$index" -> part
}
// add partition columns
if (partitionSet.nonEmpty) {
properties += "spark.sql.sources.schema.numPartCols" -> partitionSet.size.toString
partitionSet.zipWithIndex.foreach { case (partCol, index) =>
properties += s"spark.sql.sources.schema.partCol.$index" -> partCol
}
}
var sqlPropertyText = ConfigUtils.configToString(properties)
sqlPropertyText = if (parameters.containsKey(HIVE_TABLE_PROPERTIES)) {
sqlPropertyText + "\n" + parameters(HIVE_TABLE_PROPERTIES)
} else {
sqlPropertyText
}
parameters + (HIVE_TABLE_PROPERTIES -> sqlPropertyText)
}

private def createSqlTableSerdeProperties(parameters: Map[String, String], basePath: String): String = {
val pathProp = s"path=$basePath"
if (parameters.containsKey(HIVE_TABLE_SERDE_PROPERTIES)) {
pathProp + "\n" + parameters(HIVE_TABLE_SERDE_PROPERTIES)
} else {
pathProp
}
}

private def metaSync(spark: SparkSession, parameters: Map[String, String], basePath: Path,
schema: StructType): Boolean = {
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
var metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
var syncClientToolClassSet = scala.collection.mutable.Set[String]()
parameters(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)

val newParameters = addSqlTableProperties(spark.sessionState.conf, schema, parameters)
// for backward compatibility
if (hiveSyncEnabled) {
metaSyncEnabled = true
syncClientToolClassSet += classOf[HiveSyncTool].getName
}
var metaSyncSuccess = true
if (metaSyncEnabled) {
val fs = basePath.getFileSystem(hadoopConf)
val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
syncClientToolClassSet.foreach(impl => {
val syncSuccess = impl.trim match {
case "org.apache.hudi.hive.HiveSyncTool" => {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
syncHive(basePath, fs, parameters)
syncHive(basePath, fs, newParameters)
true
}
case _ => {
val properties = new Properties();
properties.putAll(parameters)
properties.putAll(newParameters)
properties.put("basePath", basePath.toString)
val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool]
syncHoodie.syncHoodieTable()
Expand All @@ -463,7 +529,9 @@ private[hudi] object HoodieSparkSqlWriter {
*/
case class TableInstantInfo(basePath: Path, instantTime: String, commitActionType: String, operation: WriteOperationType)

private def commitAndPerformPostOperations(writeResult: HoodieWriteResult,
private def commitAndPerformPostOperations(spark: SparkSession,
schema: StructType,
writeResult: HoodieWriteResult,
parameters: Map[String, String],
client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
Expand Down Expand Up @@ -497,7 +565,8 @@ private[hudi] object HoodieSparkSqlWriter {
}

log.info(s"Compaction Scheduled is $compactionInstant")
val metaSyncSuccess = metaSync(parameters, tableInstantInfo.basePath, jsc.hadoopConfiguration())

val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema)

log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
if (!asyncCompactionEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ import java.util
import java.util.{Collections, Date, UUID}

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap}
import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
Expand Down Expand Up @@ -486,6 +490,46 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
})

test("Test build sync config for spark sql") {
initSparkContext("test build sync config")
val addSqlTablePropertiesMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties",
classOf[SQLConf], classOf[StructType], classOf[Map[_,_]])
addSqlTablePropertiesMethod.setAccessible(true)

val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val basePath = "/tmp/hoodie_test"
val params = Map(
"path" -> basePath,
DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition"
)
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params)
val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter,
spark.sessionState.conf, structType, parameters)
.asInstanceOf[Map[String, String]]

val buildSyncConfigMethod =
HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path],
classOf[Map[_,_]])
buildSyncConfigMethod.setAccessible(true)

val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter,
new Path(basePath), newParams).asInstanceOf[HiveSyncConfig]

assertResult("spark.sql.sources.provider=hudi\n" +
"spark.sql.sources.schema.partCol.0=partition\n" +
"spark.sql.sources.schema.numParts=1\n" +
"spark.sql.sources.schema.numPartCols=1\n" +
"spark.sql.sources.schema.part.0=" +
"{\"type\":\"struct\",\"fields\":[{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it would be nice to construct the expected output rather than hardcoding. Can we at least use structType to construct the schema parts in this expected output. would be good to avoid hardcoding it.

"{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," +
"{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we need to assert HIVE_TABLE_PROPERTIES as well ?


assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties)
}

case class Test(uuid: String, ts: Long)

import scala.collection.JavaConverters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.dla;

import com.beust.jcommander.JCommander;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
Expand Down Expand Up @@ -149,14 +150,14 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi
if (!useRealTimeInputFormat) {
String inputFormatClassName = HoodieParquetInputFormat.class.getName();
hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName());
ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>());
} else {
// Custom serde will not work with ALTER TABLE REPLACE COLUMNS
// https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
// /ql/exec/DDLTask.java#L3488
String inputFormatClassName = HoodieParquetRealtimeInputFormat.class.getName();
hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(),
ParquetHiveSerDe.class.getName());
ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>());
}
} else {
// Check if the table schema has evolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,12 @@ private void createDLAConnection() {
}

@Override
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
String outputFormatClass, String serdeClass,
Map<String, String> serdeProperties, Map<String, String> tableProperties) {
try {
String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass);
String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(),
inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties);
LOG.info("Creating table with " + createSQLQuery);
updateDLASQL(createSQLQuery);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system")
public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE;

@Parameter(names = {"--table-properties"}, description = "Table properties to hive table")
public String tableProperties;

@Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table")
public String serdeProperties;

Comment on lines +91 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the toString() in this class ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks for remind me.

@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

Expand All @@ -114,32 +120,36 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) {
newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
newConfig.supportTimestamp = cfg.supportTimestamp;
newConfig.decodePartition = cfg.decodePartition;
newConfig.tableProperties = cfg.tableProperties;
newConfig.serdeProperties = cfg.serdeProperties;
return newConfig;
}

@Override
public String toString() {
return "HiveSyncConfig{"
+ "databaseName='" + databaseName + '\''
+ ", tableName='" + tableName + '\''
+ ", baseFileFormat='" + baseFileFormat + '\''
+ ", hiveUser='" + hiveUser + '\''
+ ", hivePass='" + hivePass + '\''
+ ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\''
+ ", partitionFields=" + partitionFields
+ ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
+ ", assumeDatePartitioning=" + assumeDatePartitioning
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat
+ ", useJdbc=" + useJdbc
+ ", autoCreateDatabase=" + autoCreateDatabase
+ ", ignoreExceptions=" + ignoreExceptions
+ ", skipROSuffix=" + skipROSuffix
+ ", help=" + help
+ ", supportTimestamp=" + supportTimestamp
+ ", decodePartition=" + decodePartition
+ ", useFileListingFromMetadata=" + useFileListingFromMetadata
+ ", verifyMetadataFileListing=" + verifyMetadataFileListing
+ '}';
+ "databaseName='" + databaseName + '\''
+ ", tableName='" + tableName + '\''
+ ", baseFileFormat='" + baseFileFormat + '\''
+ ", hiveUser='" + hiveUser + '\''
+ ", hivePass='" + hivePass + '\''
+ ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\''
+ ", partitionFields=" + partitionFields
+ ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
+ ", assumeDatePartitioning=" + assumeDatePartitioning
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat
+ ", useJdbc=" + useJdbc
+ ", autoCreateDatabase=" + autoCreateDatabase
+ ", ignoreExceptions=" + ignoreExceptions
+ ", skipROSuffix=" + skipROSuffix
+ ", useFileListingFromMetadata=" + useFileListingFromMetadata
+ ", verifyMetadataFileListing=" + verifyMetadataFileListing
+ ", tableProperties='" + tableProperties + '\''
+ ", serdeProperties='" + serdeProperties + '\''
+ ", help=" + help
+ ", supportTimestamp=" + supportTimestamp
+ ", decodePartition=" + decodePartition
+ '}';
}
}
Loading