Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator

import org.apache.log4j.LogManager

/**
Expand Down Expand Up @@ -65,7 +66,7 @@ object DataSourceReadOptions {
* This eases migration from old configs to new configs.
*/
def translateViewTypesToQueryTypes(optParams: Map[String, String]) : Map[String, String] = {
val translation = Map(VIEW_TYPE_READ_OPTIMIZED_OPT_VAL -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
val translation = Map(VIEW_TYPE_READ_OPTIMIZED_OPT_VAL -> QUERY_TYPE_READ_OPTIMIZED_OPT_VAL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don’t think this change is necessary, right? RO view does map to snapshot query for cow. We may need to have two maps for cow and mor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused by the naming sometimes...
So for COW table, snapshot view = read optimized view
for MOR, snapshot view and read optimized view are different things.
With bootstrap, we will have one more view.
Can we call read optimized view -> parquet only(including bootstrap) snapshot view -> parquet(with bootstrap) merge with log regardless of table type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this mapping because we should be able to do RO view on MOR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.. there are no more views.. we did a renaming exercise to clear things up as "query types" .. with that there should be no confusion.. our docs are consistent with this as well.. On COW there is in fact no RO view.. so this change has to be done differently, if you need for MOR..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my previous comments are confusing, let me rephrase.
What I trying to do here is to not change the query behavior. Since before we don't support snapshot query for MOR, so RO and snapshot query type will behave the same regardless of its COW or MOR.
If we don't change this mapping, the user will have different behavior after upgrade to the next release. If they are using VIEW_TYPE_READ_OPTIMIZED_OPT_VAL(deprecated) on MOR in their code, after upgrade to the next release, the code will run snapshot query instead of RO query. This could give users surprise even this key was deprecated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f they are using VIEW_TYPE_READ_OPTIMIZED_OPT_VAL(deprecated) on MOR in their code, after upgrade to the next release, the code will run snapshot query instead of RO query.

we have been logging warning for sometime on the use of the deprecated configs. and so I think its fair to do the right thing here moving forward and call this out in the release notes. Let me push some changes..

VIEW_TYPE_INCREMENTAL_OPT_VAL -> QUERY_TYPE_INCREMENTAL_OPT_VAL,
VIEW_TYPE_REALTIME_OPT_VAL -> QUERY_TYPE_SNAPSHOT_OPT_VAL)
if (optParams.contains(VIEW_TYPE_OPT_KEY) && !optParams.contains(QUERY_TYPE_OPT_KEY)) {
Expand Down
63 changes: 44 additions & 19 deletions hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.hudi

import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.{HoodieException, TableNotFoundException}
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.DataSource
Expand Down Expand Up @@ -58,26 +60,28 @@ class DefaultSource extends RelationProvider
throw new HoodieException("'path' must be specified.")
}

// Try to create hoodie table meta client from the give path
// TODO: Smarter path handling
val metaClient = try {
val conf = sqlContext.sparkContext.hadoopConfiguration
Option(new HoodieTableMetaClient(conf, path.get, true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would n't be problematic if path is a glob and not the actual basePath of the table? COW/Snapshot query can for e.g do this and I think we should handle the same for MOR as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we have:

  • RO, Snapshot query for COW: Support glob and basePath
  • Snapshot for MOR: only support basePath
  • Incremental: Only support basePath

What I am trying to do here is:

  • If the path contains glob, fall back to RO. This is the current behavior. Create metaClient will throw an Exception but handled below.
  • If the path is basePath, we create the metaClient. If COW table, go RO relation. If MOR, go snapshot relation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snapshot for MOR: only support basePath

Let me think about this more. We need to support some form of globbing for MOR/Snapshot query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Udit's PR has this path handling. Should we merge part of his PR first? https://github.com/apache/hudi/pull/1702/files#diff-9a21766ebf794414f94b302bcb968f41R31
With this, we can handle user to .load(basePath) or .load(basePath + "/*/*") for COW, MOR and incremental.

} catch {
case e: HoodieException => Option.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just error out there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used this as a flag that the path is not basePath. This is a temporary solution to not change the query behavior.
This will be handled better with: https://github.com/apache/hudi/pull/1702/files#diff-9a21766ebf794414f94b302bcb968f41R31

}

if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
sqlContext.sparkContext.hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])

log.info("Constructing hoodie (as parquet) data source with options :" + parameters)
log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " +
"Please query the Hive table registered using Spark SQL.")
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
userSpecifiedSchema = Option(schema),
className = "parquet",
options = parameters)
.resolveRelation()
if (metaClient.isDefined && metaClient.get.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
new SnapshotRelation(sqlContext, path.get, optParams, schema, metaClient.get)
} else {
getReadOptimizedView(sqlContext, parameters, schema)
}
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
getReadOptimizedView(sqlContext, parameters, schema)
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
new IncrementalRelation(sqlContext, path.get, optParams, schema)
if (metaClient.isEmpty) {
throw new TableNotFoundException(path.get)
}
new IncrementalRelation(sqlContext, path.get, optParams, schema, metaClient.get)
} else {
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
}
Expand Down Expand Up @@ -123,4 +127,25 @@ class DefaultSource extends RelationProvider
}

override def shortName(): String = "hudi"

private def getReadOptimizedView(sqlContext: SQLContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can rename to something like getFilteredBaseFileRelation(). Again, don't want to bring back view nomenclature into the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do

optParams: Map[String, String],
schema: StructType): BaseRelation = {
log.warn("Loading Read Optimized view.")
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
sqlContext.sparkContext.hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])

log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
userSpecifiedSchema = Option(schema),
className = "parquet",
options = optParams)
.resolveRelation()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ import scala.collection.mutable
class IncrementalRelation(val sqlContext: SQLContext,
val basePath: String,
val optParams: Map[String, String],
val userSchema: StructType) extends BaseRelation with TableScan {
val userSchema: StructType,
val metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan {

private val log = LogManager.getLogger(classOf[IncrementalRelation])

private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
// MOR tables not supported yet
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
Expand Down
139 changes: 139 additions & 0 deletions hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.{HoodieParquetInputFormat, HoodieROTablePathFilter}
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.table.HoodieTable

import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType

import java.util
import scala.collection.JavaConverters._

/**
* This is the Spark DataSourceV1 relation to read Hudi MOR table.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is what we use for both cow and mor.. see comment above. Would be best to keep behavior for cow same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not as efficient as the default DataSource.apply().resolveRelation() yet, due to the filter pushdown and column pruning is not supported yet. But we will get there soon...

* @param sqlContext
* @param basePath
* @param optParams
* @param userSchema
*/
class SnapshotRelation(val sqlContext: SQLContext,
val basePath: String,
val optParams: Map[String, String],
val userSchema: StructType,
val metaClient: HoodieTableMetaClient) extends BaseRelation with PrunedFilteredScan {

private val log = LogManager.getLogger(classOf[SnapshotRelation])
private val conf = sqlContext.sparkContext.hadoopConfiguration

// Load Hudi table
private val hoodieTable = HoodieTable.create(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(), conf)
private val commitTimeline = hoodieTable.getMetaClient.getCommitsAndCompactionTimeline
if (commitTimeline.empty()) {
throw new HoodieException("No Valid Hudi timeline exists")
}
private val completedCommitTimeline = hoodieTable.getMetaClient.getCommitsTimeline.filterCompletedInstants()
private val lastInstant = completedCommitTimeline.lastInstant().get()

// Set config for listStatus() in HoodieParquetInputFormat
conf.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])
conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive", "true")
conf.setStrings("hoodie.realtime.last.commit", lastInstant.getTimestamp)

private val hoodieInputFormat = new HoodieParquetInputFormat
hoodieInputFormat.setConf(conf)

// List all parquet files
private val fileStatus = hoodieInputFormat.listStatus(new JobConf(conf))

val (parquetPaths, parquetWithLogPaths) = if (lastInstant.getAction.equals(HoodieTimeline.COMMIT_ACTION)
|| lastInstant.getAction.equals(HoodieTimeline.COMPACTION_ACTION)) {
(fileStatus.map(f => f.getPath.toString).toList, Map.empty[String, String])
} else {
val fileGroups = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, util.Arrays.stream(fileStatus)).asScala
// Split the file group to: parquet file without a matching log file, parquet file need to merge with log files
val parquetPaths: List[String] = fileGroups.filter(p => p._2.size() == 0).keys.toList
val parquetWithLogPaths: Map[String, String] = fileGroups
.filter(p => p._2.size() > 0)
.map{ case(k, v) => (k, v.asScala.toList.mkString(","))}
.toMap
(parquetPaths, parquetWithLogPaths)
}

if (log.isDebugEnabled) {
log.debug("Stand alone parquet files: \n" + parquetPaths.mkString("\n"))
log.debug("Parquet files that have matching log files: \n" + parquetWithLogPaths.map(m => s"${m._1}:${m._2}").mkString("\n"))
}

// Add log file map to options
private val finalOps = optParams ++ parquetWithLogPaths

// use schema from latest metadata, if not present, read schema from the data file
private val latestSchema = {
val schemaUtil = new TableSchemaResolver(metaClient)
val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
}

override def schema: StructType = latestSchema

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
if (parquetWithLogPaths.isEmpty) {
sqlContext
.read
.options(finalOps)
.schema(schema)
.format("parquet")
.load(parquetPaths:_*)
.selectExpr(requiredColumns:_*)
.rdd
} else {
val regularParquet = sqlContext
.read
.options(finalOps)
.schema(schema)
.format("parquet")
.load(parquetPaths:_*)
// Hudi parquet files needed to merge with log file
sqlContext
.read
.options(finalOps)
.schema(schema)
.format("org.apache.spark.sql.execution.datasources.parquet.HoodieParquetRealtimeFileFormat")
.load(parquetWithLogPaths.keys.toList: _*)
.union(regularParquet)
.rdd
}
}
}
Loading