Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.hudi.catalog
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
Expand Down Expand Up @@ -76,18 +75,19 @@ trait ProvidesHoodieConfig extends Logging {

/**
* Build the default config for insert.
*
* @return
*/
def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession,
isOverwrite: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String]): Map[String, String] = {
def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession,
isOverwrite: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String]): Map[String, String] = {

if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) {
throw new IllegalArgumentException(s"Insert partition fields" +
s"[${insertPartitions.keys.mkString(" " )}]" +
s"[${insertPartitions.keys.mkString(" ")}]" +
s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]")
}
val path = hoodieCatalogTable.tableLocation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,17 @@

package org.apache.spark.sql.hudi.command

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.hudi.HoodieSparkSqlWriter
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}

import scala.collection.JavaConverters._

/**
* Command for insert into hoodie table.
*/
Expand All @@ -56,7 +47,7 @@ case class InsertIntoHoodieTableCommand(
}
}

object InsertIntoHoodieTableCommand extends Logging {
object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
/**
* Run the insert query. We support both dynamic partition insert and static partition insert.
* @param sparkSession The spark session.
Expand Down Expand Up @@ -173,112 +164,4 @@ object InsertIntoHoodieTableCommand extends Logging {
val alignedProjects = dataProjectsWithoutMetaFields ++ partitionProjects
Project(alignedProjects, query)
}

/**
* Build the default config for insert.
* @return
*/
private def buildHoodieInsertConfig(
hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession,
isOverwrite: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String]): Map[String, String] = {

if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) {
throw new IllegalArgumentException(s"Insert partition fields" +
s"[${insertPartitions.keys.mkString(" " )}]" +
s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]")
}
val path = hoodieCatalogTable.tableLocation
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema

val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
val parameters = withSparkConf(sparkSession, options)()

val preCombineColumn = hoodieCatalogTable.preCombineKey.getOrElse("")
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")

val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitioning).getOrElse("false")
val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
.getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)

val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean

val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val hasPreCombineColumn = preCombineColumn.nonEmpty
val operation =
(enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
case (true, _, _, false, _) =>
throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
case (true, true, _, _, true) =>
throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
case (true, _, true, _, _) =>
throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
s" Please disable $INSERT_DROP_DUPS and try again.")
// if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite table
case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// insert overwrite partition
case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
// disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode.
case (false, false, false, false, _) if hasPreCombineColumn => UPSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
// for the rest case, use the insert operation
case _ => INSERT_OPERATION_OPT_VAL
}

val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
// Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload
// on reading.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
classOf[OverwriteWithLatestAvroPayload].getCanonicalName
}
logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")

val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, options) {
Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
TBL_NAME.key -> hoodieCatalogTable.tableName,
PRECOMBINE_FIELD.key -> preCombineColumn,
OPERATION.key -> operation,
HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PARTITIONPATH_FIELD.key -> partitionFields,
PAYLOAD_CLASS_NAME.key -> payloadClassName,
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPreCombineColumn),
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_PARTITION_FIELDS.key -> partitionFields,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,21 @@

package org.apache.spark.sql.hudi.command

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructField

import scala.collection.JavaConverters._

case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends HoodieLeafRunnableCommand
with SparkAdapterSupport {
with SparkAdapterSupport with ProvidesHoodieConfig {

private val table = updateTable.table
private val tableId = getTableIdentifier(table)
Expand Down Expand Up @@ -71,7 +67,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends HoodieLeaf
df = df.filter(Column(updateTable.condition.get))
}
df = df.select(projects: _*)
val config = buildHoodieConfig(sparkSession)
val config = buildHoodieConfig(HoodieCatalogTable(sparkSession, tableId))
df.write
.format("hudi")
.mode(SaveMode.Append)
Expand All @@ -82,42 +78,6 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends HoodieLeaf
Seq.empty[Row]
}

private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig

val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("")
assert(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key in table $tableId, cannot execute update operator")
val enableHive = isEnableHive(sparkSession)

withSparkConf(sparkSession, catalogProperties) {
Map(
"path" -> hoodieCatalogTable.tableLocation,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> preCombineColumn,
TBL_NAME.key -> hoodieCatalogTable.tableName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
HIVE_TABLE.key -> tableId.table,
HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
}
}

def cast(exp:Expression, field: StructField, sqlConf: SQLConf): Expression = {
castIfNeeded(exp, field.dataType, sqlConf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelpe
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, getTableLocation, removeMetaFields, tableExistsInPath}
import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table, ProvidesHoodieConfig}
import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table}
import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableCommand}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.sources.{Filter, InsertableRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down