Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.hudi

import java.util.stream.Collectors
import org.apache.avro.Schema

import java.util.stream.Collectors
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
Expand Down Expand Up @@ -89,8 +90,13 @@ class IncrementalRelation(val sqlContext: SQLContext,
} else {
schemaResolver.getTableAvroSchemaWithoutMetadataFields()
}
val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
StructType(skeletonSchema.fields ++ dataSchema.fields)
if (tableSchema.getType == Schema.Type.NULL) {
// if there is only one commit in the table and is an empty commit without schema, return empty RDD here
StructType(Nil)
} else {
val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
StructType(skeletonSchema.fields ++ dataSchema.fields)
}
}

private val filters = optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key,
Expand All @@ -99,86 +105,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
override def schema: StructType = usedSchema

override def buildScan(): RDD[Row] = {
val regularFileIdToFullPath = mutable.HashMap[String, String]()
var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()

// create Replaced file group
val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant =>
val replaceMetadata = HoodieReplaceCommitMetadata.
fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata])
replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry =>
entry.getValue.map { e =>
val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString
(e, fullPath)
if (usedSchema == StructType(Nil)) {
// if first commit in a table is an empty commit without schema, return empty RDD here
sqlContext.sparkContext.emptyRDD[Row]
} else {
val regularFileIdToFullPath = mutable.HashMap[String, String]()
var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()

// create Replaced file group
val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant =>
val replaceMetadata = HoodieReplaceCommitMetadata.
fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata])
replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry =>
entry.getValue.map { e =>
val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString
(e, fullPath)
}
}
}.toMap

for (commit <- commitsToReturn) {
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
.get, classOf[HoodieCommitMetadata])

if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
replacedFile.contains(k) && v.startsWith(replacedFile(k))
}
} else {
regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
replacedFile.contains(k) && v.startsWith(replacedFile(k))
}
}
}
}.toMap

for (commit <- commitsToReturn) {
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
.get, classOf[HoodieCommitMetadata])
if (metaBootstrapFileIdToFullPath.nonEmpty) {
// filer out meta bootstrap files that have had more commits since metadata bootstrap
metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
.filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
}

if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
replacedFile.contains(k) && v.startsWith(replacedFile(k))
}
} else {
regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
replacedFile.contains(k) && v.startsWith(replacedFile(k))
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB.key,
DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
(regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
} else {
(regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
}
}
}

if (metaBootstrapFileIdToFullPath.nonEmpty) {
// filer out meta bootstrap files that have had more commits since metadata bootstrap
metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
.filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
}

val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB.key,
DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
if(!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
(regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
// will filter out all the files incorrectly.
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
(regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
}
}
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
// will filter out all the files incorrectly.
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters)
log.info("Additional Filters to be applied to incremental source are :" + filters)

var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)

if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
}
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
}

if (regularFileIdToFullPath.nonEmpty)
{
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema)
.parquet(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
}
if (regularFileIdToFullPath.nonEmpty) {
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema)
.parquet(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
}

filters.foldLeft(df)((e, f) => e.filter(f)).rdd
filters.foldLeft(df)((e, f) => e.filter(f)).rdd
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
Dataset<Row> source = metaReader.load(srcPath);

if (source.isEmpty()) {
Comment thread
nsivabalan marked this conversation as resolved.
return Pair.of(Option.empty(), instantEndpts.getRight());
}

String filter = "s3.object.size > 0";
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {
Expand Down