Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
}

@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
public void dropPartitions(String tableName, List<String> partitionsToDrop) {
throw new UnsupportedOperationException("Not support dropPartitionsToTable yet.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,25 +421,6 @@ public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAd
return latestSchema;
}


/**
* Get Last commit's Metadata.
*/
public Option<HoodieCommitMetadata> getLatestCommitMetadata() {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}

/**
* Read the parquet schema from a parquet File.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,4 +1336,22 @@ public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableC
inflightAndCompletedPartitions.addAll(getCompletedMetadataPartitions(tableConfig));
return inflightAndCompletedPartitions;
}

/**
* Get Last commit's Metadata.
*/
public static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}
Copy link
Contributor

@xiarixiaoyao xiarixiaoyao Apr 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move this method from TableSchemaResolver to this class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move this method from TableSchemaResolver to this class

@vinothchandar suggest extract this out to a separate static helper.

}
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,6 @@ public void addPartitionsToTable(final String tableName, final List<String> part
throw new UnsupportedOperationException("No support for addPartitionsToTable yet.");
}

@Override
public void dropPartitionsToTable(final String tableName, final List<String> partitionsToDrop) {
// bigQuery discovers the new partitions automatically, so do nothing.
throw new UnsupportedOperationException("No support for dropPartitionsToTable yet.");
}

public boolean datasetExists() {
Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName));
return dataset != null;
Expand Down Expand Up @@ -236,6 +230,12 @@ public void updatePartitionsToTable(final String tableName, final List<String> c
throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
}

@Override
public void dropPartitions(String tableName, List<String> partitionsToDrop) {
// bigQuery discovers the new partitions automatically, so do nothing.
throw new UnsupportedOperationException("No support for dropPartitions yet.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls do not delete origin code annotation

// bigQuery discovers the new partitions automatically, so do nothing.

}

@Override
public void close() {
// bigQuery has no connection close method, so do nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
*/
lazy val partitionFields: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)

/**
* BaseFileFormat
*/
lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name()

/**
* The schema of table.
* Make StructField nullable and fill the comments in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,38 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor}
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, withSparkConf}
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import java.util
import java.util.Locale

import scala.collection.JavaConverters._

trait ProvidesHoodieConfig extends Logging {

def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
val sparkSession: SparkSession = hoodieCatalogTable.spark
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
val tableId = hoodieCatalogTable.table.identifier

// NOTE: Here we fallback to "" to make sure that null value is not overridden with
// default value ("ts")
Expand All @@ -51,6 +59,10 @@ trait ProvidesHoodieConfig extends Logging {
s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator")
val enableHive = isEnableHive(sparkSession)

val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)

val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)

withSparkConf(sparkSession, catalogProperties) {
Map.apply(
"path" -> hoodieCatalogTable.tableLocation,
Expand All @@ -63,15 +75,14 @@ trait ProvidesHoodieConfig extends Logging {
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
HIVE_TABLE.key -> tableId.table,
HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
.filter { case(_, v) => v != null }
Expand All @@ -98,10 +109,12 @@ trait ProvidesHoodieConfig extends Logging {
val path = hoodieCatalogTable.tableLocation
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val catalogProperties = hoodieCatalogTable.catalogProperties

val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf, extraOptions)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)

val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
val parameters = withSparkConf(sparkSession, options)()
val parameters = withSparkConf(sparkSession, catalogProperties)()

val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",")

Expand Down Expand Up @@ -161,7 +174,7 @@ trait ProvidesHoodieConfig extends Logging {

val enableHive = isEnableHive(sparkSession)

withSparkConf(sparkSession, options) {
withSparkConf(sparkSession, catalogProperties) {
Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
Expand All @@ -177,20 +190,124 @@ trait ProvidesHoodieConfig extends Logging {
PAYLOAD_CLASS_NAME.key -> payloadClassName,
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
HIVE_PARTITION_FIELDS.key -> partitionFieldsStr,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"),
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
.filter { case (_, v) => v != null }
}
}

def buildHoodieDropPartitionsConfig(
sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable,
partitionsToDrop: String): Map[String, String] = {
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
val enableHive = isEnableHive(sparkSession)
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig

val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)

withSparkConf(sparkSession, catalogProperties) {
Map(
"path" -> hoodieCatalogTable.tableLocation,
TBL_NAME.key -> hoodieCatalogTable.tableName,
TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
PARTITIONS_TO_DELETE.key -> partitionsToDrop,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
PARTITIONPATH_FIELD.key -> partitionFields,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
)
.filter { case (_, v) => v != null }
}
}

def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession): Map[String, String] = {
val path = hoodieCatalogTable.tableLocation
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase(Locale.ROOT))
val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.toLowerCase(Locale.ROOT)


assert(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key defined in table ${hoodieCatalogTable.table.identifier}, cannot execute delete operation")

val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)

withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
TBL_NAME.key -> tableConfig.getTableName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
)
}
}

def getHoodieProps(catalogProperties: Map[String, String], tableConfig: HoodieTableConfig, conf: SQLConf, extraOptions: Map[String, String] = Map.empty): TypedProperties = {
val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ conf.getAllConfs ++ extraOptions
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(options)
hoodieConfig.getProps
}

def buildHiveSyncConfig(props: TypedProperties, hoodieCatalogTable: HoodieCatalogTable): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig
hiveSyncConfig.basePath = hoodieCatalogTable.tableLocation
hiveSyncConfig.baseFileFormat = hoodieCatalogTable.baseFileFormat
hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key, HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.defaultValue.toBoolean)
hiveSyncConfig.databaseName = hoodieCatalogTable.table.identifier.database.getOrElse("default")
if (props.containsKey(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)) {
hiveSyncConfig.tableName = props.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)
} else {
hiveSyncConfig.tableName = hoodieCatalogTable.table.identifier.table
}
hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())
hiveSyncConfig.hiveUser = props.getString(HiveSyncConfig.HIVE_USER.key, HiveSyncConfig.HIVE_USER.defaultValue)
hiveSyncConfig.hivePass = props.getString(HiveSyncConfig.HIVE_PASS.key, HiveSyncConfig.HIVE_PASS.defaultValue)
hiveSyncConfig.jdbcUrl = props.getString(HiveSyncConfig.HIVE_URL.key, HiveSyncConfig.HIVE_URL.defaultValue)
hiveSyncConfig.metastoreUris = props.getString(HiveSyncConfig.METASTORE_URIS.key, HiveSyncConfig.METASTORE_URIS.defaultValue)
hiveSyncConfig.partitionFields = props.getStringList(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, ",", new util.ArrayList[String])
hiveSyncConfig.partitionValueExtractorClass = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName)
if (props.containsKey(HiveSyncConfig.HIVE_SYNC_MODE.key)) hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key)
hiveSyncConfig.autoCreateDatabase = props.getString(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key, HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.defaultValue).toBoolean
hiveSyncConfig.ignoreExceptions = props.getString(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key, HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.defaultValue).toBoolean
hiveSyncConfig.skipROSuffix = props.getString(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key, HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean
hiveSyncConfig.supportTimestamp = props.getString(HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key, "true").toBoolean
hiveSyncConfig.isConditionalSync = props.getString(HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.key, HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.defaultValue).toBoolean
hiveSyncConfig.bucketSpec = if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key, HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue)) HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key))
else null
if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION)) hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION)
hiveSyncConfig.syncComment = props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT.key, DataSourceWriteOptions.HIVE_SYNC_COMMENT.defaultValue).toBoolean
hiveSyncConfig
}
}
Loading