Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public static Option<Path> getTablePath(FileSystem fs, Path path) throws HoodieE
return getTablePathFromPartitionPath(fs, directory);
}

public static Boolean isHoodieMetaPath(String path) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: isHoodieMeta"data"Path

return isInsideTableMetadataFolder(path) || isTableMetadataFolder(path);
}

private static boolean isTableMetadataFolder(String path) {
return path != null && path.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public class HoodieTestDataGenerator {

//Maintains all the existing keys schema wise
private final Map<String, Map<Integer, KeyPartition>> existingKeysBySchema;
private final String[] partitionPaths;
private String[] partitionPaths;
//maintains the count of existing keys schema wise
private Map<String, Integer> numKeysBySchema;

Expand Down Expand Up @@ -805,6 +805,10 @@ public List<GenericRecord> generateGenericRecords(int numRecords) {
return list;
}

public void setPartitionPaths(String[] partitionPaths) {
this.partitionPaths = partitionPaths;
}

public String[] getPartitionPaths() {
return partitionPaths;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
Expand Down Expand Up @@ -148,8 +149,7 @@ public boolean accept(Path path) {

// Skip all files that are descendants of .hoodie in its path.
String filePath = path.toString();
if (filePath.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/")
|| filePath.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME)) {
if (TablePathUtils.isHoodieMetaPath(filePath)) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Skipping Hoodie Metadata file %s \n", filePath));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DefaultSource extends RelationProvider
" Falling back to Read Optimized query.")
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
} else {
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)(sqlContext.sparkSession)
}
} else {
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.util.TablePathUtils
import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex.shouldFilterOut
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

Expand Down Expand Up @@ -77,18 +81,25 @@ object HoodieSparkUtils {
* @return list of absolute file paths
*/
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
val globPaths =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we enhance java docs (lines 76, 77) to convey that we do partition pruning as well within this method.

paths.flatMap(path => {
val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPaths = globPathIfNecessary(fs, qualified)
globPaths
})
globPaths.filterNot( path => TablePathUtils.isHoodieMetaPath(path.toString) || shouldFilterOut(path.getName))
}

def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
}

def createInMemoryFileIndex(sparkSession: SparkSession, userSpecifiedSchema: Option[StructType], parameters: Map[String, String], globbedPaths: Seq[Path]): InMemoryFileIndex = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the other method (line 94 to 97) since we have this new method?

val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, globbedPaths, parameters, userSpecifiedSchema, fileStatusCache)
}

def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema, structName, recordNamespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@

package org.apache.hudi

import com.google.common.annotations.VisibleForTesting
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.execution.hudi.utils.PushDownUtils
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.sources.{BaseRelation, CatalystScan, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.{StructField, StructType}

import java.util.Locale
import scala.collection.JavaConverters._
import scala.collection.mutable

case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile],
logPaths: Option[List[String]],
Expand All @@ -55,8 +60,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType,
val globPaths: Seq[Path],
val metaClient: HoodieTableMetaClient)
extends BaseRelation with PrunedFilteredScan with Logging {
val metaClient: HoodieTableMetaClient)(val sparkSession: SparkSession)
extends BaseRelation with CatalystScan with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garyli1019 : what's the counter part of this file for COW? I understand this PR is one for MOR. but just for my understanding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have an independent class counterpart for COW since we just need to pass a filter to a normal parquet reader.
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala#L165


private val conf = sqlContext.sparkContext.hadoopConfiguration
private val jobConf = new JobConf(conf)
Expand All @@ -68,7 +73,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
private val fileIndex = buildFileIndex()
private val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, Some(tableStructSchema), optParams, globPaths)
private var fileIndex: List[HoodieMergeOnReadFileSplit] = _
private val preCombineField = {
val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
if (preCombineFieldFromTableConfig != null) {
Expand All @@ -79,16 +85,33 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD)
}
}
override def schema: StructType = tableStructSchema

val partitionStructSchema = inMemoryFileIndex.partitionSpec().partitionColumns
val overlappedPartCols = mutable.Map.empty[String, StructField]
partitionStructSchema.foreach { partitionField =>
if (tableStructSchema.exists(getColName(_) == getColName(partitionField))) {
overlappedPartCols += getColName(partitionField) -> partitionField
}
}

// When data and partition schemas have overlapping columns, the output
// schema respects the order of the data schema for the overlapping columns, and it
// respects the data types of the partition schema.
override def schema: StructType = {
StructType(tableStructSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
partitionStructSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
}

override def needConversion: Boolean = false

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = {
fileIndex = buildFileIndex(filters)
val pushedFilters = PushDownUtils.transformFilter(filters)
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s" buildScan filters = ${filters.mkString(",")}")
log.debug(s" buildScan filters = ${pushedFilters.mkString(",")}")
var requiredStructSchema = StructType(Seq())
requiredColumns.foreach(col => {
val field = tableStructSchema.find(_.name == col)
val field = tableStructSchema.find(_.name == col.name)
if (field.isDefined) {
requiredStructSchema = requiredStructSchema.add(field.get)
}
Expand All @@ -106,18 +129,18 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
partitionSchema = partitionStructSchema,
requiredSchema = tableStructSchema,
filters = filters,
filters = pushedFilters,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
partitionSchema = partitionStructSchema,
requiredSchema = requiredStructSchema,
filters = filters,
filters = pushedFilters,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
Expand All @@ -132,9 +155,14 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
rdd.asInstanceOf[RDD[Row]]
}

def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
val fileStatuses = inMemoryFileIndex.allFiles()
def buildFileIndex(filters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
val selectedPartitions = inMemoryFileIndex.listFiles(filters, filters)
val selectedPartitionsPathMap = selectedPartitions.flatMap(x=>{
val files = x.files
val fileMap = files.map(file=>{(file.getPath.getName,x.values)})
fileMap
}).toMap
val fileStatuses = selectedPartitions.flatMap(_.files)
if (fileStatuses.isEmpty) {
throw new HoodieException("No files found for reading in user provided path.")
}
Expand All @@ -148,10 +176,22 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val fileSplits = fileGroup.map(kv => {
val baseFile = kv._1
val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)
val partitionValues = selectedPartitionsPathMap.get(baseFile.getFileName).get
val partitionedFile = PartitionedFile(partitionValues, baseFile.getPath, 0, baseFile.getFileLen)
HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
}).toList
fileSplits
}

private def getColName(f: StructField): String = {
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
f.name
} else {
f.name.toLowerCase(Locale.ROOT)
}
}

@VisibleForTesting
def getFileIndexPaths = fileIndex.map(x => x.dataFile.get.filePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any annotation like VisibleOnlyForTests as we have in java. Not a big fan of introducing public methods in source code just for test purposes.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.hudi.utils

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.DataSourceStrategy.translateFilter
import org.apache.spark.sql.sources.{BaseRelation, Filter}

/**
* This util object is use DataSourceStrategy protected translateFilter method
* [https://github.com/apache/spark/blob/v2.4.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L439]
*/
object PushDownUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java docs.


/**
* Tries to translate a Catalyst Seq[Expression] into data source Array[Filter].
*/
def transformFilter(filterPredicates: Seq[Expression]): Array[Filter] = {
val translatedMap: Map[Expression, Filter] = filterPredicates.flatMap { p =>
translateFilter(p).map(f => p -> f)
}.toMap
translatedMap.values.toArray
}
}
Loading