Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,6 @@ object HoodieOptionConfig {
.toMap
}

/**
* Get the primary key from the table options.
* @param options
* @return
*/
def getPrimaryColumns(options: Map[String, String]): Array[String] = {
val params = mapSqlOptionsToDataSourceWriteConfigs(options)
params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.map(_.split(",").filter(_.nonEmpty))
.getOrElse(Array.empty)
}

/**
* Get the table type from the table options.
* @param options
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.util.ConfigUtils
Expand Down Expand Up @@ -92,13 +93,11 @@ case class CreateHoodieTableAsSelectCommand(
hoodieCatalogTable.initHoodieTable()

val tableProperties = hoodieCatalogTable.catalogProperties
// NOTE: Users might be specifying write-configuration (inadvertently) as options or table properties
// in CTAS, therefore we need to make sure that these are appropriately propagated to the
// write operation
val options = tableProperties ++ Map(
val options = Map(
HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString,
HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tableProperties.asJava),
HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(updatedTable.properties.asJava),
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> "false",
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TBL_NAME}
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
Expand All @@ -38,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
Expand Down Expand Up @@ -491,8 +491,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val targetTableDb = targetTableIdentify.database.getOrElse("default")
val targetTableName = targetTableIdentify.identifier
val path = hoodieCatalogTable.tableLocation
// force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in MergeIntoHoodieTableCommand
val catalogProperties = hoodieCatalogTable.catalogProperties + (PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName)
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
Expand All @@ -503,10 +501,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
// TODO(HUDI-3456) clean up
val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")

val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)

withSparkConf(sparkSession, catalogProperties) {
withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
Expand All @@ -525,10 +522,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,

// NOTE: We have to explicitly override following configs to make sure no schema validation is performed
// as schema of the incoming dataset might be diverging from the table's schema (full schemas'
Expand All @@ -539,7 +534,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
RECONCILE_SCHEMA.key -> "false",
"hoodie.datasource.write.schema.canonicalize" -> "false"
)
.filter { case (_, v) => v != null }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ class HiveSyncProcedure extends BaseProcedure with ProcedureBuilder
hiveConf.addResource(hadoopConf)

val tableConfig = hoodieCatalogTable.tableConfig
val hoodieProps = getHoodieProps(hoodieCatalogTable.catalogProperties, tableConfig, sqlConf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)

var hiveSyncTool: HiveSyncTool = null
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.checkMessageContains
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
Expand Down Expand Up @@ -144,8 +147,11 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
try {
spark.sql(sql)
} catch {
case e: Throwable if e.getMessage.trim.contains(errorMsg.trim) => hasException = true
case f: Throwable => fail("Exception should contain: " + errorMsg + ", error message: " + f.getMessage, f)
case e: Throwable if checkMessageContains(e, errorMsg) || checkMessageContains(getRootCause(e), errorMsg) =>
hasException = true

case f: Throwable =>
fail("Exception should contain: " + errorMsg + ", error message: " + f.getMessage, f)
}
assertResult(true)(hasException)
}
Expand Down Expand Up @@ -219,3 +225,19 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
}
}
}

object HoodieSparkSqlTestBase {

def getLastCommitMetadata(spark: SparkSession, tablePath: String) = {
val metaClient = HoodieTableMetaClient.builder()
.setConf(spark.sparkContext.hadoopConfiguration)
.setBasePath(tablePath)
.build()

metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getRight
}

private def checkMessageContains(e: Throwable, text: String): Boolean =
e.getMessage.trim.contains(text.trim)

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName
import org.apache.hudi.config.HoodieWriteConfig
Expand All @@ -28,6 +28,7 @@ import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator,
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.assertFalse

Expand Down Expand Up @@ -299,6 +300,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| AS
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
""".stripMargin)

assertResult(WriteOperationType.BULK_INSERT) {
getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName1").getOperationType
}
checkAnswer(s"select id, name, price, ts from $tableName1")(
Seq(1, "a1", 10.0, 1000)
)
Expand All @@ -318,6 +323,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
""".stripMargin
)

assertResult(WriteOperationType.BULK_INSERT) {
getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName2").getOperationType
}
checkAnswer(s"select id, name, price, dt from $tableName2")(
Seq(1, "a1", 10, "2021-04-01")
)
Expand Down Expand Up @@ -356,9 +365,14 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| price
""".stripMargin
)

assertResult(WriteOperationType.BULK_INSERT) {
getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName3").getOperationType
}
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName3")(
Seq(1, "a1", 10, "2021-05-06 00:00:00")
)

// Create table with date type partition
val tableName4 = generateTableName
spark.sql(
Expand All @@ -375,6 +389,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| price
""".stripMargin
)

assertResult(WriteOperationType.BULK_INSERT) {
getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName4").getOperationType
}
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName4")(
Seq(1, "a1", 10, "2021-05-06")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model.WriteOperationType
import org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata

import java.io.File

Expand Down Expand Up @@ -725,13 +727,20 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')")

assertResult(WriteOperationType.BULK_INSERT) {
getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType
}
checkAnswer(s"select id, name, price, dt from $tableName")(
Seq(1, "a1", 10.0, "2021-07-18")
)

// Disable the bulk insert
spark.sql("set hoodie.sql.bulk.insert.enable = false")
spark.sql(s"insert into $tableName values(2, 'a2', 10, '2021-07-18')")

assertResult(WriteOperationType.INSERT) {
getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType
}
checkAnswer(s"select id, name, price, dt from $tableName order by id")(
Seq(1, "a1", 10.0, "2021-07-18"),
Seq(2, "a2", 10.0, "2021-07-18")
Expand Down