Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,37 @@ class DefaultSource extends RelationProvider
// Add default options for unspecified read options keys.
val parameters = Map(QUERY_TYPE_OPT_KEY -> DEFAULT_QUERY_TYPE_OPT_VAL) ++ translateViewTypesToQueryTypes(optParams)

// TODO: Determine whether table is bootstrapped or not
val isBootstrapped = true

val path = parameters.get("path")
if (path.isEmpty) {
throw new HoodieException("'path' must be specified.")
}

if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
sqlContext.sparkContext.hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])

log.info("Constructing hoodie (as parquet) data source with options :" + parameters)
log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " +
"Please query the Hive table registered using Spark SQL.")
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
userSpecifiedSchema = Option(schema),
className = "parquet",
options = parameters)
.resolveRelation()

if (isBootstrapped) {
new HudiBootstrapRelation(sqlContext, schema, path.get, optParams)
} else {
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
sqlContext.sparkContext.hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])

log.info("Constructing hoodie (as parquet) data source with options :" + parameters)
log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " +
"Please query the Hive table registered using Spark SQL.")
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
userSpecifiedSchema = Option(schema),
className = "parquet",
options = parameters)
.resolveRelation()
}
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
new IncrementalRelation(sqlContext, path.get, optParams, schema)
} else {
Expand Down
153 changes: 153 additions & 0 deletions hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

class HudiBootstrapRDD(@transient spark: SparkSession,
dataReadFunction: PartitionedFile => Iterator[Any],
skeletonReadFunction: PartitionedFile => Iterator[Any],
regularReadFunction: PartitionedFile => Iterator[Any],
dataSchema: StructType,
skeletonSchema: StructType,
requiredColumns: Array[String],
tableState: HudiBootstrapTableState)
extends RDD[InternalRow](spark.sparkContext, Nil) {

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val bootstrapPartition = split.asInstanceOf[HudiBootstrapPartition]

if (bootstrapPartition.split.skeletonFile.isDefined) {
logInfo("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+ bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
+ bootstrapPartition.split.skeletonFile.get.filePath)
} else {
logInfo("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+ bootstrapPartition.split.dataFile.filePath)
}

var partitionedFileIterator: Iterator[Any] = null

if (bootstrapPartition.split.skeletonFile.isDefined) {
val dataFileIterator = dataReadFunction(bootstrapPartition.split.dataFile)
val skeletonFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
} else {
partitionedFileIterator = regularReadFunction(bootstrapPartition.split.dataFile)
}

import scala.collection.JavaConverters._
val rows = partitionedFileIterator.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
})
rows
}

def merge(skeletonFileIterator: Iterator[Any], dataFileIterator: Iterator[Any]): Iterator[Any] = {
new Iterator[Any] {
override def hasNext: Boolean = skeletonFileIterator.hasNext && dataFileIterator.hasNext

override def next(): Any = {
val skeletonEntity = skeletonFileIterator.next()
val dataEntity = dataFileIterator.next()

(skeletonEntity, dataEntity) match {
case (skeleton: ColumnarBatch, data: ColumnarBatch) => {
mergeColumnarBatch(skeleton, data)
}
case (skeleton: InternalRow, data: InternalRow) => {
mergeInternalRow(skeleton, data)
}
}
}
}
}

def mergeColumnarBatch(skeletonBatch: ColumnarBatch, dataBatch: ColumnarBatch): ColumnarBatch = {
val mergedColumnVectors = requiredColumns.map(col => {
if (skeletonSchema.fieldNames.contains(col)) {
val idx = skeletonSchema.fieldIndex(col)
skeletonBatch.column(idx)
} else {
val idx = dataSchema.fieldIndex(col)
dataBatch.column(idx)
}
})

val mergedBatch = new ColumnarBatch(mergedColumnVectors)
mergedBatch.setNumRows(dataBatch.numRows())
mergedBatch
}

def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = {
val skeletonArr = skeletonRow.toSeq(skeletonSchema)
val dataArr = dataRow.toSeq(dataSchema)
// We need to return it in the order requested
val mergedArr = requiredColumns.map(col => {
if (skeletonSchema.fieldNames.contains(col)) {
val idx = skeletonSchema.fieldIndex(col)
skeletonArr(idx)
} else {
val idx = dataSchema.fieldIndex(col)
dataArr(idx)
}
})

logDebug("Merged data and skeleton values => " + mergedArr.mkString(","))
val mergedRow = InternalRow.fromSeq(mergedArr)
mergedRow
}

def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any])
: Iterator[InternalRow] = {
val fileIterator = readFileFunction(partitionedFile)

import scala.collection.JavaConverters._

val rows = fileIterator.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
})
rows
}

override protected def getPartitions: Array[Partition] = {
logInfo("Getting partitions..")

tableState.files.zipWithIndex.map(file => {
if (file._1.skeletonFile.isDefined) {
logInfo("Forming partition with => " + file._2 + "," + file._1.dataFile.filePath
+ "," + file._1.skeletonFile.get.filePath)
HudiBootstrapPartition(file._2, file._1)
} else {
logInfo("Forming partition with => " + file._2 + "," + file._1.dataFile.filePath)
HudiBootstrapPartition(file._2, file._1)
}
}).toArray
}
}

case class HudiBootstrapPartition(index: Int, split: HudiBootstrapSplit) extends Partition
Loading