Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -470,4 +471,44 @@ private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFi
}
}

/**
* Iterate through a list of commits in ascending order, and extract the file status of
* all affected files from the commits metadata grouping by partition path. If the files has
* been touched multiple times in the given commits, the return value will keep the one
* from the latest commit.
* @param basePath
* @param commitsToCheck
* @param timeline
* @return HashMap<partitionPath, HashMap<fileName, FileStatus>>
* @throws IOException
*/
public static HashMap<String, HashMap<String, FileStatus>> listAffectedFilesForCommits(
Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException {
// TODO: Use HoodieMetaTable to extract affected file directly.
HashMap<String, HashMap<String, FileStatus>> partitionToFileStatusesMap = new HashMap<>();
List<HoodieInstant> sortedCommitsToCheck = new ArrayList<>(commitsToCheck);
sortedCommitsToCheck.sort(HoodieInstant::compareTo);
// Iterate through the given commits.
for (HoodieInstant commit: sortedCommitsToCheck) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
// Iterate through all the affected partitions of a commit.
for (Map.Entry<String, List<HoodieWriteStat>> entry: commitMetadata.getPartitionToWriteStats().entrySet()) {
if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
partitionToFileStatusesMap.put(entry.getKey(), new HashMap<>());
}
// Iterate through all the written files of this partition.
for (HoodieWriteStat stat : entry.getValue()) {
String relativeFilePath = stat.getPath();
Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
if (fullPath != null) {
FileStatus fs = new FileStatus(stat.getFileSizeInBytes(), false, 0, 0,
0, fullPath);
partitionToFileStatusesMap.get(entry.getKey()).put(fullPath.getName(), fs);
}
}
}
}
return partitionToFileStatusesMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ class DefaultSource extends RelationProvider
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
new IncrementalRelation(sqlContext, tablePath, optParams, schema)
val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, metaClient)
} else {
new IncrementalRelation(sqlContext, optParams, schema, metaClient)
}
} else {
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,30 +50,32 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val confBroadcast = sc.broadcast(new SerializableWritable(config))

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
mergeParquetPartition.split match {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader)
read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
logFileIterator(logFileOnlySplit, getConfig)
case skipMergeSplit if skipMergeSplit.mergeType
.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
skipMergeFileIterator(
skipMergeSplit,
read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader),
read(skipMergeSplit.dataFile.get, requiredSchemaFileReader),
getConfig
)
case payloadCombineSplit if payloadCombineSplit.mergeType
.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
payloadCombineFileIterator(
payloadCombineSplit,
read(mergeParquetPartition.split.dataFile, fullSchemaFileReader),
read(payloadCombineSplit.dataFile.get, fullSchemaFileReader),
getConfig
)
case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
s"spark partition Index: ${mergeParquetPartition.index}" +
s"merge type: ${mergeParquetPartition.split.mergeType}")
s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
s"log paths: ${mergeOnReadPartition.split.logPaths.toString}" +
s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
s"spark partition Index: ${mergeOnReadPartition.index}" +
s"merge type: ${mergeOnReadPartition.split.mergeType}")
}
}

Expand Down Expand Up @@ -101,6 +103,44 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
rows
}

private def logFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala

private var recordToLoad: InternalRow = _
override def hasNext: Boolean = {
if (logRecordsKeyIterator.hasNext) {
val curAvrokey = logRecordsKeyIterator.next()
val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema)
if (!curAvroRecord.isPresent) {
// delete record found, skipping
this.hasNext
} else {
val requiredAvroRecord = AvroConversionUtils
.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder)
recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
true
}
} else {
false
}
}

override def next(): InternalRow = {
recordToLoad
}
}

private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,14 @@ import scala.collection.mutable
*
*/
class IncrementalRelation(val sqlContext: SQLContext,
val basePath: String,
val optParams: Map[String, String],
val userSchema: StructType) extends BaseRelation with TableScan {
val userSchema: StructType,
val metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan {

private val log = LogManager.getLogger(classOf[IncrementalRelation])

val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)

// MOR tables not supported yet
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
}
private val basePath = metaClient.getBasePath
// TODO : Figure out a valid HoodieWriteConfig
private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(),
new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobPattern, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.log4j.LogManager
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

/**
* Experimental.
* Relation, that implements the Hoodie incremental view for Merge On Read table.
*
*/
class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType,
val metaClient: HoodieTableMetaClient)
extends BaseRelation with PrunedFilteredScan {

private val log = LogManager.getLogger(classOf[MergeOnReadIncrementalRelation])
private val conf = sqlContext.sparkContext.hadoopConfiguration
private val jobConf = new JobConf(conf)
private val fs = FSUtils.getFs(metaClient.getBasePath, conf)
private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
}
if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY)) {
throw new HoodieException(s"Specify the begin instant time to pull from using " +
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
}

private val lastInstant = commitTimeline.lastInstant().get()
private val mergeType = optParams.getOrElse(
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)

private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}")
private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
private val schemaUtil = new TableSchemaResolver(metaClient)
private val tableAvroSchema = schemaUtil.getTableAvroSchema
private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
private val fileIndex = buildFileIndex()

override def schema: StructType = tableStructSchema

override def needConversion: Boolean = false

override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)
filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
}

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s"buildScan filters = ${filters.mkString(",")}")
// config to ensure the push down filter for parquet will be applied.
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
val pushDownFilter = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)
filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
}
var requiredStructSchema = StructType(Seq())
requiredColumns.foreach(col => {
val field = tableStructSchema.find(_.name == col)
if (field.isDefined) {
requiredStructSchema = requiredStructSchema.add(field.get)
}
})
val requiredAvroSchema = AvroConversionUtils
.convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace)
val hoodieTableState = HoodieMergeOnReadTableState(
tableStructSchema,
requiredStructSchema,
tableAvroSchema.toString,
requiredAvroSchema.toString,
fileIndex
)
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = requiredStructSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)

// Follow the implementation of Spark internal HadoopRDD to handle the broadcast configuration.
FileSystem.getLocal(jobConf)
SparkHadoopUtil.get.addCredentials(jobConf)
val rdd = new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
jobConf,
fullSchemaParquetReader,
requiredSchemaParquetReader,
hoodieTableState
)
rdd.asInstanceOf[RDD[Row]]
}

def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
val partitionsWithFileStatus = listAffectedFilesForCommits(new Path(metaClient.getBasePath),
commitsToReturn, commitsTimelineToReturn)
val affectedFileStatus = new ListBuffer[FileStatus]
partitionsWithFileStatus.iterator.foreach(p =>
p._2.iterator.foreach(status => affectedFileStatus += status._2))
val fsView = new HoodieTableFileSystemView(metaClient,
commitsTimelineToReturn, affectedFileStatus.toArray)

// Iterate partitions to create splits
val fileGroup = partitionsWithFileStatus.keySet().flatMap(partitionPath =>
fsView.getAllFileGroups(partitionPath).iterator()
).toList
val latestCommit = fsView.getLastInstant.get().getTimestamp
if (log.isDebugEnabled) {
fileGroup.foreach(f => log.debug(s"current file group id: " +
s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get().toString}"))
}

// Filter files based on user defined glob pattern
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
val filteredFileGroup = if(!pathGlobPattern
.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
fileGroup.filter(f => {
if (f.getLatestFileSlice.get().getBaseFile.isPresent) {
globMatcher.matches(f.getLatestFileSlice.get().getBaseFile.get.getPath)
} else {
globMatcher.matches(f.getLatestFileSlice.get().getLatestLogFile.get().getPath.toString)
}
})
} else {
fileGroup
}

// Build HoodieMergeOnReadFileSplit.
filteredFileGroup.map(f => {
// Ensure get the base file when there is a pending compaction, which means the base file
// won't be in the latest file slice.
val baseFiles = f.getAllFileSlices.iterator().filter(slice => slice.getBaseFile.isPresent).toList
val partitionedFile = if (baseFiles.nonEmpty) {
val baseFile = baseFiles.head.getBaseFile
Option(PartitionedFile(InternalRow.empty, baseFile.get.getPath, 0, baseFile.get.getFileLen))
}
else {
Option.empty
}

val logPath = if (f.getLatestFileSlice.isPresent) {
//If log path doesn't exist, we still include an empty path to avoid using
// the default parquet reader to ensure the push down filter will be applied.
Option(f.getLatestFileSlice.get().getLogFiles.iterator().toList
.map(logfile => logfile.getPath.toString))
}
else {
Option.empty
}

HoodieMergeOnReadFileSplit(partitionedFile, logPath,
latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
})
}
}
Loading