Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docker/demo/hive-table-check.commands
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ show partitions stock_ticks_cow;
show partitions stock_ticks_mor_ro;
show partitions stock_ticks_mor_rt;

show create table stock_ticks_cow;
show create table stock_ticks_mor_ro;
show create table stock_ticks_mor_rt;
show create table stock_ticks_cow_bs;
show create table stock_ticks_mor_bs_ro;
show create table stock_ticks_mor_bs_rt;

!quit


Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
Expand All @@ -49,11 +50,13 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -137,16 +140,14 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSpli
}

// Return parquet file with a list of log files in the same file group.
public static Map<HoodieBaseFile, List<String>> groupLogsByBaseFile(Configuration conf, List<HoodieBaseFile> fileStatuses) {
Map<Path, List<HoodieBaseFile>> partitionsToParquetSplits =
fileStatuses.stream().collect(Collectors.groupingBy(file -> file.getFileStatus().getPath().getParent()));
public static List<Pair<Option<HoodieBaseFile>, List<String>>> groupLogsByBaseFile(Configuration conf, List<Path> partitionPaths) {
Set<Path> partitionSet = new HashSet<>(partitionPaths);
// TODO(vc): Should we handle also non-hoodie splits here?
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionsToParquetSplits.keySet());
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet);

// for all unique split parents, obtain all delta files based on delta commit timeline,
// grouped on file id
Map<HoodieBaseFile, List<String>> resultMap = new HashMap<>();
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
// Get all the base file and it's log files pairs in required partition paths.
List<Pair<Option<HoodieBaseFile>, List<String>>> baseAndLogsList = new ArrayList<>();
partitionSet.forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
Expand All @@ -161,28 +162,18 @@ public static Map<HoodieBaseFile, List<String>> groupLogsByBaseFile(Configuratio
.map(instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
.orElse(Stream.empty());

// subgroup splits again by file id & match with log files.
Map<String, List<HoodieBaseFile>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
.collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getFileStatus().getPath().getName())));
latestFileSlices.forEach(fileSlice -> {
List<HoodieBaseFile> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
dataFileSplits.forEach(split -> {
try {
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
resultMap.put(split, logFilePaths);
} catch (Exception e) {
throw new HoodieException("Error creating hoodie real time split ", e);
}
});
baseAndLogsList.add(Pair.of(fileSlice.getBaseFile(), logFilePaths));
});
} catch (Exception e) {
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
}
});
return resultMap;
return baseAndLogsList;
}


/**
* Add a field to the existing fields projected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ private void testHiveAfterFirstBatch() throws Exception {
assertStdOutContains(stdOutErrPair,
"| partition |\n+----------------+\n| dt=2018-08-31 |\n+----------------+\n", 3);

// There should have 5 data source tables except stock_ticks_mor_bs_rt.
// After [HUDI-2071] has solved, we can inc the number 5 to 6.
assertStdOutContains(stdOutErrPair, "'spark.sql.sources.provider'='hudi'", 5);

stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:29:00 |\n", 6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ object DataSourceWriteOptions {
// HIVE SYNC SPECIFIC CONFIGS
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
// unexpected issues with config getting reset

val HIVE_SYNC_ENABLED_OPT_KEY: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.enable")
.defaultValue("false")
Expand Down Expand Up @@ -441,16 +440,6 @@ object DataSourceWriteOptions {
.withDocumentation("‘INT64’ with original type TIMESTAMP_MICROS is converted to hive ‘timestamp’ type. " +
"Disabled by default for backward compatibility.")

val HIVE_TABLE_PROPERTIES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.table_properties")
.noDefaultValue()
.withDocumentation("")

val HIVE_TABLE_SERDE_PROPERTIES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.serde_properties")
.noDefaultValue()
.withDocumentation("")

val HIVE_SYNC_AS_DATA_SOURCE_TABLE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.hive_sync.sync_as_datasource")
.defaultValue("true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.log4j.LogManager
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
Expand Down Expand Up @@ -105,8 +106,14 @@ class DefaultSource extends RelationProvider
val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
val tableType = metaClient.getTableType
val queryType = parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue)
log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType")

// First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool,
// or else use query type from QUERY_TYPE_OPT_KEY.
val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue()))

log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")

(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ case class HoodieFileIndex(
}
}

private lazy val metadataConfig = {
val properties = new Properties()
properties.putAll(options.asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}

@transient @volatile private var fileSystemView: HoodieTableFileSystemView = _
@transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _
@transient @volatile private var cachedFileSize: Long = 0L
Expand Down Expand Up @@ -195,8 +201,8 @@ case class HoodieFileIndex(
* @param predicates The filter condition.
* @return The Pruned partition paths.
*/
private def prunePartition(partitionPaths: Seq[PartitionRowPath],
predicates: Seq[Expression]): Seq[PartitionRowPath] = {
def prunePartition(partitionPaths: Seq[PartitionRowPath],
predicates: Seq[Expression]): Seq[PartitionRowPath] = {

val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
val partitionPruningPredicates = predicates.filter {
Expand All @@ -222,26 +228,13 @@ case class HoodieFileIndex(
}
}

/**
* Load all partition paths and it's files under the query table path.
*/
private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
def getAllQueryPartitionPaths: Seq[PartitionRowPath] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val properties = new Properties()
properties.putAll(options.asJava)
val metadataConfig = HoodieMetadataConfig.newBuilder.fromProperties(properties).build()

val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath)
// Load all the partition path from the basePath, and filter by the query partition path.
// TODO load files from the queryPartitionPath directly.
val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala
.filter(_.startsWith(queryPartitionPath))

val writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath).withProperties(properties).build()
val maxListParallelism = writeConfig.getFileListingParallelism

val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
val partitionSchema = _partitionSchemaFromProperties
val timeZoneId = CaseInsensitiveMap(options)
.get(DateTimeUtils.TIMEZONE_OPTION)
Expand All @@ -250,7 +243,7 @@ case class HoodieFileIndex(
val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark
.sessionState.conf)
// Convert partition path to PartitionRowPath
val partitionRowPaths = partitionPaths.map { partitionPath =>
partitionPaths.map { partitionPath =>
val partitionRow = if (partitionSchema.fields.length == 0) {
// This is a non-partitioned table
InternalRow.empty
Expand Down Expand Up @@ -308,7 +301,20 @@ case class HoodieFileIndex(
}
PartitionRowPath(partitionRow, partitionPath)
}
}

/**
* Load all partition paths and it's files under the query table path.
*/
private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = {
val properties = new Properties()
properties.putAll(options.asJava)
val writeConfig = HoodieWriteConfig.newBuilder()
.withPath(basePath).withProperties(properties).build()

val maxListParallelism = writeConfig.getFileListingParallelism
val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
val partitionRowPaths = getAllQueryPartitionPaths
// List files in all of the partition path.
val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]()
val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.hudi

import java.util
import java.util.Properties

import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
Expand All @@ -36,24 +35,21 @@ import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.log4j.LogManager
import org.apache.spark.SPARK_VERSION
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

object HoodieSparkSqlWriter {

Expand Down Expand Up @@ -397,15 +393,15 @@ object HoodieSparkSqlWriter {
}
}

private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig): Boolean = {
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig)
private def syncHive(basePath: Path, fs: FileSystem, hoodieConfig: HoodieConfig, sqlConf: SQLConf): Boolean = {
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, hoodieConfig, sqlConf)
val hiveConf: HiveConf = new HiveConf()
hiveConf.addResource(fs.getConf)
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
true
}

private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig): HiveSyncConfig = {
private def buildSyncConfig(basePath: Path, hoodieConfig: HoodieConfig, sqlConf: SQLConf): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
hiveSyncConfig.basePath = basePath.toString
hiveSyncConfig.baseFileFormat = hoodieConfig.getString(HIVE_BASE_FILE_FORMAT_OPT_KEY)
Expand All @@ -430,76 +426,14 @@ object HoodieSparkSqlWriter {
hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean
hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt

val syncAsDtaSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
if (syncAsDtaSourceTable) {
hiveSyncConfig.tableProperties = hoodieConfig.getStringOrDefault(HIVE_TABLE_PROPERTIES, null)
val serdePropText = createSqlTableSerdeProperties(hoodieConfig, basePath.toString)
val serdeProp = ConfigUtils.toMap(serdePropText)
serdeProp.put(ConfigUtils.SPARK_QUERY_TYPE_KEY, DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key)
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)

hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp)
}
hiveSyncConfig.saveAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD)
hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
hiveSyncConfig
}

/**
* Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES.
* @param sqlConf The spark sql conf.
* @param schema The schema to write to the table.
* @param hoodieConfig The HoodieConfig contains origin parameters.
* @return A new parameters added the HIVE_TABLE_PROPERTIES property.
*/
private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType,
hoodieConfig: HoodieConfig): HoodieConfig = {
// Convert the schema and partition info used by spark sql to hive table properties.
// The following code refers to the spark code in
// https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

// Sync schema with meta fields
val schemaWithMetaFields = HoodieSqlUtils.addMetaFields(schema)
val partitionSet = hoodieConfig.getString(HIVE_PARTITION_FIELDS_OPT_KEY)
.split(",").map(_.trim).filter(!_.isEmpty).toSet
val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD)

val (partitionCols, dataCols) = schemaWithMetaFields.partition(c => partitionSet.contains(c.name))
val reOrderedType = StructType(dataCols ++ partitionCols)
val schemaParts = reOrderedType.json.grouped(threshold).toSeq

var properties = Map(
"spark.sql.sources.provider" -> "hudi",
"spark.sql.sources.schema.numParts" -> schemaParts.size.toString
)
schemaParts.zipWithIndex.foreach { case (part, index) =>
properties += s"spark.sql.sources.schema.part.$index" -> part
}
// add partition columns
if (partitionSet.nonEmpty) {
properties += "spark.sql.sources.schema.numPartCols" -> partitionSet.size.toString
partitionSet.zipWithIndex.foreach { case (partCol, index) =>
properties += s"spark.sql.sources.schema.partCol.$index" -> partCol
}
}
var sqlPropertyText = ConfigUtils.configToString(properties)
sqlPropertyText = if (hoodieConfig.contains(HIVE_TABLE_PROPERTIES)) {
sqlPropertyText + "\n" + hoodieConfig.getString(HIVE_TABLE_PROPERTIES)
} else {
sqlPropertyText
}
hoodieConfig.setValue(HIVE_TABLE_PROPERTIES, sqlPropertyText)
hoodieConfig
}

private def createSqlTableSerdeProperties(hoodieConfig: HoodieConfig, basePath: String): String = {
val pathProp = s"path=$basePath"
if (hoodieConfig.contains(HIVE_TABLE_SERDE_PROPERTIES)) {
pathProp + "\n" + hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES)
} else {
pathProp
}
}


private def metaSync(spark: SparkSession, hoodieConfig: HoodieConfig, basePath: Path,
schema: StructType): Boolean = {
Expand All @@ -508,7 +442,6 @@ object HoodieSparkSqlWriter {
var syncClientToolClassSet = scala.collection.mutable.Set[String]()
hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)

val newHoodieConfig = addSqlTableProperties(spark.sessionState.conf, schema, hoodieConfig)
// for backward compatibility
if (hiveSyncEnabled) {
metaSyncEnabled = true
Expand All @@ -521,12 +454,12 @@ object HoodieSparkSqlWriter {
val syncSuccess = impl.trim match {
case "org.apache.hudi.hive.HiveSyncTool" => {
log.info("Syncing to Hive Metastore (URL: " + hoodieConfig.getString(HIVE_URL_OPT_KEY) + ")")
syncHive(basePath, fs, newHoodieConfig)
syncHive(basePath, fs, hoodieConfig, spark.sessionState.conf)
true
}
case _ => {
val properties = new Properties()
properties.putAll(newHoodieConfig.getProps)
properties.putAll(hoodieConfig.getProps)
properties.put("basePath", basePath.toString)
val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool]
syncHoodie.syncHoodieTable()
Expand Down
Loading