Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
4d582cf
Refactored `HoodieTestDataGenerator` to make data generation reproduc…
Feb 19, 2022
f612d96
Extracted common fields to `HoodieBaseRelation`;
Feb 14, 2022
4662f7d
Cleaned up `BaseFileOnlyViewRelation`
Feb 14, 2022
46c56ef
Streamlined `BaseFileOnlyViewRelation`
Feb 14, 2022
2449ea1
Introduced `HoodieBaseRDD` to make sure all Hudi custom impls are imp…
Feb 15, 2022
8f466d1
Fixed all of the Relations to avoid superfluous Row <> InternalRow co…
Feb 15, 2022
ef42921
Tidying up
Feb 15, 2022
f12aa31
Fixing compilation
Feb 15, 2022
4d2d93b
Fixing compilation after rebase
Feb 18, 2022
ad4a142
After rebase fixes
Feb 18, 2022
fedd8ef
Fixing compilation
Feb 18, 2022
c4bbe3f
`HoodieBaseRDD` > `HoodieUnsafeRDD`;
Feb 19, 2022
0b3079c
`HoodieBaseRDD` > `HoodieUnsafeRDD`;
Feb 19, 2022
c24d7d2
Bootstrapped `HoodieUnsafeRDDUtils`
Feb 19, 2022
a6260fd
Missing license
Feb 19, 2022
2ec56ef
Bootstrapped test for MOR table validating proper column projections …
Feb 21, 2022
845d290
Amended test to run against non-partitioned table (to query just a si…
Feb 22, 2022
cd755c1
Tidying up
Feb 22, 2022
ebee576
Added tests with no Delta Logs
Feb 22, 2022
568e5da
Extracted Parquet projection related tests into standalone file
Feb 22, 2022
808657c
Added payload-combine merging cases
Feb 22, 2022
081197b
Tiyding up
Feb 22, 2022
f8f3ec2
Broken down into multiple tests
Feb 22, 2022
ac2684a
Added test for COW
Feb 22, 2022
b0218c5
Added test for MOR Incremental Relation
Feb 22, 2022
edf7c32
Expose `mandatoryColumns` to be accessible in tests
Feb 22, 2022
21abebc
Typo
Feb 23, 2022
d6b2b6e
`HoodieAvroSerializerTrait` > `HoodieAvroSerializer`;
Feb 24, 2022
c81f822
`lint`
Feb 24, 2022
62edf7d
Added `SparkHadoopUtil` delegating to Spark's private impl
Mar 4, 2022
e3902bf
Fixed tests for Spark 2.x
Mar 4, 2022
b752778
Tidying up
Mar 4, 2022
2965980
Removing logs from MOR / Incremental test
Mar 4, 2022
880f4c9
Removed `SparkHadoopUtil`
Mar 4, 2022
be33a18
Fixed `HoodieWriteConfig` to be able to disable compression in Parquet
Mar 4, 2022
c048190
Disable compression in MOR / Incremental test to make test stable
Mar 4, 2022
ed632b8
Disable MOR / Incremental test
Mar 4, 2022
97b6d31
Disable full-table-read tests as unstable in Spark 2.x
Mar 5, 2022
1ecd626
Incorrect schema ref
Mar 7, 2022
448555f
Fixing compilation
Mar 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
Expand Down Expand Up @@ -1540,7 +1541,8 @@ public double getParquetCompressionRatio() {
}

public CompressionCodecName getParquetCompressionCodec() {
return CompressionCodecName.fromConf(getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME));
String codecName = getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName) ? null : codecName);
}

public boolean parquetDictionaryEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,24 @@ object HoodieSparkUtils extends SparkAdapterSupport {
* Convert Filters to Catalyst Expressions and joined by And. If convert success return an
* Non-Empty Option[Expression],or else return None.
*/
def convertToCatalystExpressions(filters: Array[Filter],
tableSchema: StructType): Option[Expression] = {
val expressions = filters.map(convertToCatalystExpression(_, tableSchema))
def convertToCatalystExpressions(filters: Seq[Filter],
tableSchema: StructType): Seq[Option[Expression]] = {
filters.map(convertToCatalystExpression(_, tableSchema))
}


/**
* Convert Filters to Catalyst Expressions and joined by And. If convert success return an
* Non-Empty Option[Expression],or else return None.
*/
def convertToCatalystExpression(filters: Array[Filter],
tableSchema: StructType): Option[Expression] = {
val expressions = convertToCatalystExpressions(filters, tableSchema)
if (expressions.forall(p => p.isDefined)) {
if (expressions.isEmpty) {
None
} else if (expressions.length == 1) {
expressions(0)
expressions.head
} else {
Some(expressions.map(_.get).reduce(org.apache.spark.sql.catalyst.expressions.And))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ package org.apache.spark.sql.avro
* If you're looking to convert Avro into "deserialized" [[Row]] (comprised of Java native types),
* please check [[AvroConversionUtils]]
*/
trait HoodieAvroDeserializerTrait {
final def deserialize(data: Any): Option[Any] =
doDeserialize(data) match {
case opt: Option[_] => opt // As of Spark 3.1, this will return data wrapped with Option, so we fetch the data
case row => Some(row) // For other Spark versions, return the data as is
}

protected def doDeserialize(data: Any): Any
trait HoodieAvroDeserializer {
def deserialize(data: Any): Option[Any]
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ package org.apache.spark.sql.avro
* NOTE: This is low-level component operating on Spark internal data-types (comprising [[InternalRow]]).
* If you're looking to convert "deserialized" [[Row]] into Avro, please check [[AvroConversionUtils]]
*/
trait HoodieAvroSerializerTrait {
trait HoodieAvroSerializer {
def serialize(catalystData: Any): Any
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi

import org.apache.avro.Schema
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, HoodieAvroSerializerTrait}
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand All @@ -43,16 +43,16 @@ import java.util.Locale
trait SparkAdapter extends Serializable {

/**
* Creates instance of [[HoodieAvroSerializerTrait]] providing for ability to serialize
* Creates instance of [[HoodieAvroSerializer]] providing for ability to serialize
* Spark's [[InternalRow]] into Avro payloads
*/
def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializerTrait
def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer

/**
* Creates instance of [[HoodieAvroDeserializerTrait]] providing for ability to deserialize
* Creates instance of [[HoodieAvroDeserializer]] providing for ability to deserialize
* Avro payloads into Spark's [[InternalRow]]
*/
def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializerTrait
def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer

/**
* Create the SparkRowSerDe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@

package org.apache.hudi.testutils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
Expand All @@ -28,6 +35,7 @@
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -42,6 +50,7 @@
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.table.HoodieSparkTable;
Expand All @@ -50,14 +59,11 @@
import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -69,6 +75,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
Expand Down Expand Up @@ -348,6 +355,21 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean
.withRollbackUsingMarkers(rollbackUsingMarkers);
}

protected Dataset<Row> toDataset(List<HoodieRecord> records, Schema schema) {
List<GenericRecord> avroRecords = records.stream()
.map(r -> {
HoodieRecordPayload payload = (HoodieRecordPayload) r.getData();
try {
return (GenericRecord) payload.getInsertValue(schema).get();
} catch (IOException e) {
throw new HoodieIOException("Failed to extract Avro payload", e);
}
})
.collect(Collectors.toList());
JavaRDD<GenericRecord> jrdd = jsc.parallelize(avroRecords, 2);
return AvroConversionUtils.createDataFrame(jrdd.rdd(), schema.toString(), spark);
}

protected int incrementTimelineServicePortToUse() {
// Increment the timeline service port for each individual test
// to avoid port reuse causing failures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public HoodieMetadataPayload(Option<GenericRecord> recordOpt) {
// This can be simplified using SpecificData.deepcopy once this bug is fixed
// https://issues.apache.org/jira/browse/AVRO-1811
//
// NOTE: {@code HoodieMetadataRecord} has to always carry both "key" nad "type" fields
// NOTE: {@code HoodieMetadataRecord} has to always carry both "key" and "type" fields
// for it to be handled appropriately, therefore these fields have to be reflected
// in any (read-)projected schema
key = record.get(KEY_FIELD_NAME).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,82 @@

package org.apache.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.hudi.HoodieBaseRelation.createBaseFileReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{BooleanType, StructType}
import org.apache.spark.sql.types.StructType

/**
* The implement of [[BaseRelation]], which is used to respond to query that only touches the base files(Parquet),
* like query COW tables in Snapshot-Query and Read_Optimized mode and MOR tables in Read_Optimized mode.
* [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying
* modes:
* <ul>
* <li>For COW tables: Snapshot</li>
* <li>For MOR tables: Read-optimized</li>
* </ul>
*
* NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the
* fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists
* as part of the record payload. In some cases, however, partition path might not necessarily be equal to the
* verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect
* partition field values being written
*/
class BaseFileOnlyViewRelation(
sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path]
) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")

val filterExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
.getOrElse(Literal(true, BooleanType))
val (partitionFilters, dataFilters) = {
val splited = filters.map { filter =>
HoodieDataSourceHelper.splitPartitionAndDataPredicates(
sparkSession, filterExpressions, partitionColumns)
}
(splited.flatMap(_._1), splited.flatMap(_._2))
}
val partitionFiles = getPartitionFiles(partitionFilters, dataFilters)
class BaseFileOnlyViewRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {

val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val filePartitions = sparkAdapter.getFilePartitions(sparkSession, partitionFiles, maxSplitBytes)
private val fileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams,
FileStatusCache.getOrCreate(sqlContext.sparkSession))

override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
// NOTE: In case list of requested columns doesn't contain the Primary Key one, we
// have to add it explicitly so that
// - Merging could be performed correctly
// - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
// Spark still fetches all the rows to execute the query correctly
//
// It's okay to return columns that have not been requested by the caller, as those nevertheless will be
// filtered out upstream
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)

val (requiredAvroSchema, requiredStructSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)

val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)

val filePartitions = getPartitions(partitionFilters, dataFilters)

val partitionSchema = StructType(Nil)
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString)

val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema,
val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
filters = filters,
options = optParams,
hadoopConf = sparkSession.sessionState.newHadoopConf()
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = new Configuration(conf)
)

new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema,
requiredSchemaParquetReader, filePartitions)
new HoodieFileScanRDD(sparkSession, baseFileReader, filePartitions)
}

private def getPartitionFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionedFile] = {
private def getPartitions(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FilePartition] = {
val partitionDirectories = if (globPaths.isEmpty) {
val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams,
FileStatusCache.getOrCreate(sqlContext.sparkSession))
Expand All @@ -89,18 +108,46 @@ class BaseFileOnlyViewRelation(
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}

val partitionFiles = partitionDirectories.flatMap { partition =>
val partitions = partitionDirectories.flatMap { partition =>
partition.files.flatMap { file =>
// TODO move to adapter
// TODO fix, currently assuming parquet as underlying format
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = file,
partitionValues = partition.values
// TODO clarify why this is required
partitionValues = InternalRow.empty
)
}
}

partitionFiles.map{ f =>
PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length)
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes

sparkAdapter.getFilePartitions(sparkSession, partitions, maxSplitBytes)
}

private def convertToExpressions(filters: Array[Filter]): Array[Expression] = {
val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)

val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty }
if (failedExprs.nonEmpty) {
val failedFilters = failedExprs.map(p => filters(p._2))
logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})")
}

catalystExpressions.filter(_.isDefined).map(_.get).toArray
}

/**
* Checks whether given expression only references only references partition columns
* (and involves no sub-query)
*/
private def isPartitionPredicate(condition: Expression): Boolean = {
// Validates that the provided names both resolve to the same entity
val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver

condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
!SubqueryExpression.hasSubquery(condition)
}

}
Loading