diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2d71c4d738886..92d85b041cec8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; @@ -1540,7 +1541,8 @@ public double getParquetCompressionRatio() { } public CompressionCodecName getParquetCompressionCodec() { - return CompressionCodecName.fromConf(getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME)); + String codecName = getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); + return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName) ? null : codecName); } public boolean parquetDictionaryEnabled() { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index b288289ac82ec..c963806416061 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -177,14 +177,24 @@ object HoodieSparkUtils extends SparkAdapterSupport { * Convert Filters to Catalyst Expressions and joined by And. If convert success return an * Non-Empty Option[Expression],or else return None. */ - def convertToCatalystExpressions(filters: Array[Filter], - tableSchema: StructType): Option[Expression] = { - val expressions = filters.map(convertToCatalystExpression(_, tableSchema)) + def convertToCatalystExpressions(filters: Seq[Filter], + tableSchema: StructType): Seq[Option[Expression]] = { + filters.map(convertToCatalystExpression(_, tableSchema)) + } + + + /** + * Convert Filters to Catalyst Expressions and joined by And. If convert success return an + * Non-Empty Option[Expression],or else return None. + */ + def convertToCatalystExpression(filters: Array[Filter], + tableSchema: StructType): Option[Expression] = { + val expressions = convertToCatalystExpressions(filters, tableSchema) if (expressions.forall(p => p.isDefined)) { if (expressions.isEmpty) { None } else if (expressions.length == 1) { - expressions(0) + expressions.head } else { Some(expressions.map(_.get).reduce(org.apache.spark.sql.catalyst.expressions.And)) } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala similarity index 75% rename from hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala index 5c3035304cee7..4c4ddb5bf016c 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala @@ -24,12 +24,6 @@ package org.apache.spark.sql.avro * If you're looking to convert Avro into "deserialized" [[Row]] (comprised of Java native types), * please check [[AvroConversionUtils]] */ -trait HoodieAvroDeserializerTrait { - final def deserialize(data: Any): Option[Any] = - doDeserialize(data) match { - case opt: Option[_] => opt // As of Spark 3.1, this will return data wrapped with Option, so we fetch the data - case row => Some(row) // For other Spark versions, return the data as is - } - - protected def doDeserialize(data: Any): Any +trait HoodieAvroDeserializer { + def deserialize(data: Any): Option[Any] } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala similarity index 97% rename from hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala index 159d8da74d2db..84ba44b00fbbb 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala @@ -23,6 +23,6 @@ package org.apache.spark.sql.avro * NOTE: This is low-level component operating on Spark internal data-types (comprising [[InternalRow]]). * If you're looking to convert "deserialized" [[Row]] into Avro, please check [[AvroConversionUtils]] */ -trait HoodieAvroSerializerTrait { +trait HoodieAvroSerializer { def serialize(catalystData: Any): Any } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 32ed2b16ce639..d84e291e94ee6 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi import org.apache.avro.Schema import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, HoodieAvroSerializerTrait} +import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -43,16 +43,16 @@ import java.util.Locale trait SparkAdapter extends Serializable { /** - * Creates instance of [[HoodieAvroSerializerTrait]] providing for ability to serialize + * Creates instance of [[HoodieAvroSerializer]] providing for ability to serialize * Spark's [[InternalRow]] into Avro payloads */ - def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializerTrait + def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer /** - * Creates instance of [[HoodieAvroDeserializerTrait]] providing for ability to deserialize + * Creates instance of [[HoodieAvroDeserializer]] providing for ability to deserialize * Avro payloads into Spark's [[InternalRow]] */ - def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializerTrait + def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer /** * Create the SparkRowSerDe. diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 94e080cae4804..f9676c6c477be 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -19,6 +19,13 @@ package org.apache.hudi.testutils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -28,6 +35,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -42,6 +50,7 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.table.HoodieSparkTable; @@ -50,14 +59,11 @@ import org.apache.hudi.testutils.providers.HoodieWriteClientProvider; import org.apache.hudi.testutils.providers.SparkProvider; import org.apache.hudi.timeline.service.TimelineService; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; @@ -69,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; @@ -348,6 +355,21 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean .withRollbackUsingMarkers(rollbackUsingMarkers); } + protected Dataset toDataset(List records, Schema schema) { + List avroRecords = records.stream() + .map(r -> { + HoodieRecordPayload payload = (HoodieRecordPayload) r.getData(); + try { + return (GenericRecord) payload.getInsertValue(schema).get(); + } catch (IOException e) { + throw new HoodieIOException("Failed to extract Avro payload", e); + } + }) + .collect(Collectors.toList()); + JavaRDD jrdd = jsc.parallelize(avroRecords, 2); + return AvroConversionUtils.createDataFrame(jrdd.rdd(), schema.toString(), spark); + } + protected int incrementTimelineServicePortToUse() { // Increment the timeline service port for each individual test // to avoid port reuse causing failures diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 221b52e77e674..8cadb7e884b9f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -133,7 +133,7 @@ public HoodieMetadataPayload(Option recordOpt) { // This can be simplified using SpecificData.deepcopy once this bug is fixed // https://issues.apache.org/jira/browse/AVRO-1811 // - // NOTE: {@code HoodieMetadataRecord} has to always carry both "key" nad "type" fields + // NOTE: {@code HoodieMetadataRecord} has to always carry both "key" and "type" fields // for it to be handled appropriately, therefore these fields have to be reflected // in any (read-)projected schema key = record.get(KEY_FIELD_NAME).toString(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala index 8e94805328c69..adc34afc3b753 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyViewRelation.scala @@ -18,63 +18,82 @@ package org.apache.hudi +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - +import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.hadoop.HoodieROTablePathFilter - import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} -import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.{BaseRelation, Filter} -import org.apache.spark.sql.types.{BooleanType, StructType} +import org.apache.spark.sql.types.StructType /** - * The implement of [[BaseRelation]], which is used to respond to query that only touches the base files(Parquet), - * like query COW tables in Snapshot-Query and Read_Optimized mode and MOR tables in Read_Optimized mode. + * [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying + * modes: + *
    + *
  • For COW tables: Snapshot
  • + *
  • For MOR tables: Read-optimized
  • + *
+ * + * NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the + * fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists + * as part of the record payload. In some cases, however, partition path might not necessarily be equal to the + * verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect + * partition field values being written */ -class BaseFileOnlyViewRelation( - sqlContext: SQLContext, - metaClient: HoodieTableMetaClient, - optParams: Map[String, String], - userSchema: Option[StructType], - globPaths: Seq[Path] - ) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { - - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") - - val filterExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) - .getOrElse(Literal(true, BooleanType)) - val (partitionFilters, dataFilters) = { - val splited = filters.map { filter => - HoodieDataSourceHelper.splitPartitionAndDataPredicates( - sparkSession, filterExpressions, partitionColumns) - } - (splited.flatMap(_._1), splited.flatMap(_._2)) - } - val partitionFiles = getPartitionFiles(partitionFilters, dataFilters) +class BaseFileOnlyViewRelation(sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + optParams: Map[String, String], + userSchema: Option[StructType], + globPaths: Seq[Path]) + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { - val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - val filePartitions = sparkAdapter.getFilePartitions(sparkSession, partitionFiles, maxSplitBytes) + private val fileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams, + FileStatusCache.getOrCreate(sqlContext.sparkSession)) + + override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { + // NOTE: In case list of requested columns doesn't contain the Primary Key one, we + // have to add it explicitly so that + // - Merging could be performed correctly + // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], + // Spark still fetches all the rows to execute the query correctly + // + // It's okay to return columns that have not been requested by the caller, as those nevertheless will be + // filtered out upstream + val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) + + val (requiredAvroSchema, requiredStructSchema) = + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) + + val filterExpressions = convertToExpressions(filters) + val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) + + val filePartitions = getPartitions(partitionFilters, dataFilters) + + val partitionSchema = StructType(Nil) + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) - val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( - sparkSession = sparkSession, - dataSchema = tableStructSchema, - partitionSchema = StructType(Nil), - requiredSchema = tableStructSchema, + val baseFileReader = createBaseFileReader( + spark = sparkSession, + partitionSchema = partitionSchema, + tableSchema = tableSchema, + requiredSchema = requiredSchema, filters = filters, options = optParams, - hadoopConf = sparkSession.sessionState.newHadoopConf() + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = new Configuration(conf) ) - new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema, - requiredSchemaParquetReader, filePartitions) + new HoodieFileScanRDD(sparkSession, baseFileReader, filePartitions) } - private def getPartitionFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionedFile] = { + private def getPartitions(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FilePartition] = { val partitionDirectories = if (globPaths.isEmpty) { val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) @@ -89,18 +108,46 @@ class BaseFileOnlyViewRelation( inMemoryFileIndex.listFiles(partitionFilters, dataFilters) } - val partitionFiles = partitionDirectories.flatMap { partition => + val partitions = partitionDirectories.flatMap { partition => partition.files.flatMap { file => + // TODO move to adapter + // TODO fix, currently assuming parquet as underlying format HoodieDataSourceHelper.splitFiles( sparkSession = sparkSession, file = file, - partitionValues = partition.values + // TODO clarify why this is required + partitionValues = InternalRow.empty ) } } - partitionFiles.map{ f => - PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length) + val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + + sparkAdapter.getFilePartitions(sparkSession, partitions, maxSplitBytes) + } + + private def convertToExpressions(filters: Array[Filter]): Array[Expression] = { + val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) + + val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty } + if (failedExprs.nonEmpty) { + val failedFilters = failedExprs.map(p => filters(p._2)) + logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})") } + + catalystExpressions.filter(_.isDefined).map(_.get).toArray } + + /** + * Checks whether given expression only references only references partition columns + * (and involves no sub-query) + */ + private def isPartitionPredicate(condition: Expression): Boolean = { + // Validates that the provided names both resolve to the same entity + val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver + + condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } && + !SubqueryExpression.hasSubquery(condition) + } + } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 1e2946dd26e88..e07b316d48db3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -22,38 +22,70 @@ import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.io.hfile.CacheConfig +import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.HoodieBaseRelation.isMetadataTable import org.apache.hudi.common.config.SerializableConfiguration import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.io.storage.HoodieHFileReader -import org.apache.hudi.metadata.HoodieTableMetadata +import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata} import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.sql.{Row, SQLContext, SparkSession} import scala.collection.JavaConverters._ import scala.util.Try case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String) +case class HoodieTableState(recordKeyField: String, + preCombineFieldOpt: Option[String]) + /** * Hoodie BaseRelation which extends [[PrunedFilteredScan]]. */ -abstract class HoodieBaseRelation( - val sqlContext: SQLContext, - metaClient: HoodieTableMetaClient, - optParams: Map[String, String], - userSchema: Option[StructType]) - extends BaseRelation with PrunedFilteredScan with Logging{ +abstract class HoodieBaseRelation(val sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + optParams: Map[String, String], + userSchema: Option[StructType]) + extends BaseRelation with PrunedFilteredScan with Logging { protected val sparkSession: SparkSession = sqlContext.sparkSession + protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + protected lazy val jobConf = new JobConf(conf) + + // If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one + // NOTE: This is historical behavior which is preserved as is + protected lazy val recordKeyField: String = + if (metaClient.getTableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD + else metaClient.getTableConfig.getRecordKeyFieldProp + + protected lazy val preCombineFieldOpt: Option[String] = getPrecombineFieldProperty + + /** + * @VisibleInTests + */ + lazy val mandatoryColumns: Seq[String] = { + if (isMetadataTable(metaClient)) { + Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE) + } else { + Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) + } + } + + protected lazy val specifiedQueryInstant: Option[String] = + optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) + .map(HoodieSqlCommonUtils.formatQueryInstant) + protected lazy val tableAvroSchema: Schema = { val schemaUtil = new TableSchemaResolver(metaClient) Try(schemaUtil.getTableAvroSchema).getOrElse( @@ -81,6 +113,34 @@ abstract class HoodieBaseRelation( } override def schema: StructType = tableStructSchema + + /** + * This method controls whether relation will be producing + *
    + *
  • [[Row]], when it's being equal to true
  • + *
  • [[InternalRow]], when it's being equal to false
  • + *
+ * + * Returning [[InternalRow]] directly enables us to save on needless ser/de loop from [[InternalRow]] (being + * produced by file-reader) to [[Row]] and back + */ + override final def needConversion: Boolean = false + + /** + * NOTE: DO NOT OVERRIDE THIS METHOD + */ + override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] + // Please check [[needConversion]] scala-doc for more details + doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]] + } + + protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] + + protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { + val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) + requestedColumns ++ missing + } } object HoodieBaseRelation { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index fb12549f620bd..40299cfdcd6f1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.sources.Filter @@ -33,43 +33,6 @@ import scala.collection.JavaConverters._ object HoodieDataSourceHelper extends PredicateHelper { - /** - * Partition the given condition into two sequence of conjunctive predicates: - * - predicates that can be evaluated using metadata only. - * - other predicates. - */ - def splitPartitionAndDataPredicates( - spark: SparkSession, - condition: Expression, - partitionColumns: Seq[String]): (Seq[Expression], Seq[Expression]) = { - splitConjunctivePredicates(condition).partition( - isPredicateMetadataOnly(spark, _, partitionColumns)) - } - - /** - * Check if condition can be evaluated using only metadata. In Delta, this means the condition - * only references partition columns and involves no subquery. - */ - def isPredicateMetadataOnly( - spark: SparkSession, - condition: Expression, - partitionColumns: Seq[String]): Boolean = { - isPredicatePartitionColumnsOnly(spark, condition, partitionColumns) && - !SubqueryExpression.hasSubquery(condition) - } - - /** - * Does the predicate only contains partition columns? - */ - def isPredicatePartitionColumnsOnly( - spark: SparkSession, - condition: Expression, - partitionColumns: Seq[String]): Boolean = { - val nameEquality = spark.sessionState.analyzer.resolver - condition.references.forall { r => - partitionColumns.exists(nameEquality(r.name, _)) - } - } /** * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]] diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala index 9f2d7d9e0380a..7e8f62bd2500a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala @@ -18,56 +18,37 @@ package org.apache.hudi -import org.apache.spark.{Partition, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException} -import org.apache.spark.sql.types.StructType +import org.apache.spark.{Partition, TaskContext} /** - * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]]. - * - * This class will extract the fields needed according to [[requiredColumns]] and - * return iterator of [[org.apache.spark.sql.Row]] directly. + * TODO eval if we actually need it */ -class HoodieFileScanRDD( - @transient private val sparkSession: SparkSession, - requiredColumns: Array[String], - schema: StructType, - readFunction: PartitionedFile => Iterator[InternalRow], - @transient val filePartitions: Seq[FilePartition]) - extends RDD[Row](sparkSession.sparkContext, Nil) { - - private val requiredSchema = { - val nameToStructField = schema.map(field => (field.name, field)).toMap - StructType(requiredColumns.map(nameToStructField)) - } - - private val requiredFieldPos = HoodieSparkUtils.collectFieldIndexes(requiredSchema, schema) - - override def compute(split: Partition, context: TaskContext): Iterator[Row] = { - val iterator = new Iterator[Object] with AutoCloseable { +class HoodieFileScanRDD(@transient private val sparkSession: SparkSession, + readFunction: PartitionedFile => Iterator[InternalRow], + @transient fileSplits: Seq[FilePartition]) + extends HoodieUnsafeRDD(sparkSession.sparkContext) { + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val iterator = new Iterator[InternalRow] with AutoCloseable { private[this] val files = split.asInstanceOf[FilePartition].files.toIterator - private[this] var currentFile: PartitionedFile = null - private[this] var currentIterator: Iterator[Object] = null + private[this] var currentFile: PartitionedFile = _ + private[this] var currentIterator: Iterator[InternalRow] = _ override def hasNext: Boolean = { (currentIterator != null && currentIterator.hasNext) || nextIterator() } - def next(): Object = { - currentIterator.next() - } + def next(): InternalRow = currentIterator.next() /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { if (files.hasNext) { - currentFile = files.next() - logInfo(s"Reading File $currentFile") + currentFile = files.next() currentIterator = readFunction(currentFile) try { @@ -93,17 +74,8 @@ class HoodieFileScanRDD( // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener[Unit](_ => iterator.close()) - // extract required columns from row - val iterAfterExtract = HoodieDataSourceHelper.extractRequiredSchema( - iterator.asInstanceOf[Iterator[InternalRow]], - requiredSchema, - requiredFieldPos) - - // convert InternalRow to Row and return - val converter = CatalystTypeConverters.createToScalaConverter(requiredSchema) - iterAfterExtract.map(converter(_).asInstanceOf[Row]) + iterator.asInstanceOf[Iterator[InternalRow]] } - override protected def getPartitions: Array[Partition] = filePartitions.toArray - + override protected def getPartitions: Array[Partition] = fileSplits.toArray } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 96fe47e0219d4..d58050106606b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -35,7 +35,6 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -53,10 +52,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, @transient config: Configuration, fullSchemaFileReader: PartitionedFile => Iterator[InternalRow], requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow], - tableState: HoodieMergeOnReadTableState, + tableState: HoodieTableState, tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema) - extends RDD[InternalRow](sc, Nil) { + requiredSchema: HoodieTableSchema, + @transient fileSplits: List[HoodieMergeOnReadFileSplit]) + extends HoodieUnsafeRDD(sc) { private val confBroadcast = sc.broadcast(new SerializableWritable(config)) private val recordKeyField = tableState.recordKeyField @@ -97,12 +97,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, iter } - override protected def getPartitions: Array[Partition] = { - tableState - .hoodieRealtimeFileSplits - .zipWithIndex - .map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray - } + override protected def getPartitions: Array[Partition] = + fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray private def getConfig: Configuration = { val conf = confBroadcast.value.value diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala new file mode 100644 index 0000000000000..3f95746a54669 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.{Partition, SparkContext, TaskContext} + +/** + * !!! PLEASE READ CAREFULLY !!! + * + * Base class for all of the custom low-overhead RDD implementations for Hudi. + * + * To keep memory allocation footprint as low as possible, each inheritor of this RDD base class + * + *
+ *   1. Does NOT deserialize from [[InternalRow]] to [[Row]] (therefore only providing access to
+ *   Catalyst internal representations (often mutable) of the read row)
+ *
+ *   2. DOES NOT COPY UNDERLYING ROW OUT OF THE BOX, meaning that
+ *
+ *      a) access to this RDD is NOT thread-safe
+ *
+ *      b) iterating over it reference to a _mutable_ underlying instance (of [[InternalRow]]) is
+ *      returned, entailing that after [[Iterator#next()]] is invoked on the provided iterator,
+ *      previous reference becomes **invalid**. Therefore, you will have to copy underlying mutable
+ *      instance of [[InternalRow]] if you plan to access it after [[Iterator#next()]] is invoked (filling
+ *      it with the next row's payload)
+ *
+ *      c) due to item b) above, no operation other than the iteration will produce meaningful
+ *      results on it and will likely fail [1]
+ * 
+ * + * [1] For example, [[RDD#collect]] method on this implementation would not work correctly, as it's + * simply using Scala's default [[Iterator#toArray]] method which will simply concat all the references onto + * the same underlying mutable object into [[Array]]. Instead each individual [[InternalRow]] _has to be copied_, + * before concatenating into the final output. Please refer to [[HoodieRDDUtils#collect]] for more details. + * + * NOTE: It enforces, for ex, that all of the RDDs implement [[compute]] method returning + * [[InternalRow]] to avoid superfluous ser/de + */ +abstract class HoodieUnsafeRDD(@transient sc: SparkContext) + extends RDD[InternalRow](sc, Nil) { + + def compute(split: Partition, context: TaskContext): Iterator[InternalRow] + + override final def collect(): Array[InternalRow] = + throw new UnsupportedOperationException( + "This method will not function correctly, please refer to scala-doc for HoodieUnsafeRDD" + ) +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index b9d18c68d3d60..8308e3b7ee8ad 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -19,7 +19,6 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{GlobPattern, Path} -import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient @@ -28,11 +27,11 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits} import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SQLContext} import scala.collection.JavaConversions._ @@ -47,9 +46,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, val metaClient: HoodieTableMetaClient) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { - private val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - private val jobConf = new JobConf(conf) - private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants() if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") @@ -77,8 +73,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex() - private val preCombineFieldOpt = getPrecombineFieldProperty - // Record filters making sure that only records w/in the requested bounds are being fetched as part of the // scan collected by this relation private lazy val incrementalSpanRecordsFilters: Seq[Filter] = { @@ -88,18 +82,16 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, Seq(isNotNullFilter, largerThanFilter, lessThanFilter) } - private lazy val mandatoryColumns = { + override lazy val mandatoryColumns: Seq[String] = { // NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in // cases when no columns are requested to be fetched (for ex, when using {@code count()} API) Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) } - override def needConversion: Boolean = false - - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { if (fileIndex.isEmpty) { - sqlContext.sparkContext.emptyRDD[Row] + sqlContext.sparkContext.emptyRDD[InternalRow] } else { logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") logDebug(s"buildScan filters = ${filters.mkString(",")}") @@ -148,20 +140,20 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, hadoopConf = new Configuration(conf) ) - val hoodieTableState = HoodieMergeOnReadTableState(fileIndex, HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt) + val hoodieTableState = HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt) // TODO implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately // filtered, since file-reader might not be capable to perform filtering - val rdd = new HoodieMergeOnReadRDD( + new HoodieMergeOnReadRDD( sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader, hoodieTableState, tableSchema, - requiredSchema + requiredSchema, + fileIndex ) - rdd.asInstanceOf[RDD[Row]] } } @@ -225,9 +217,4 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }) } - - private def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { - val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) - requestedColumns ++ missing - } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 7c1a3540c814e..6156054b4f45b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -20,22 +20,19 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.{createBaseFileReader, isMetadataTable} +import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes -import org.apache.hudi.metadata.HoodieMetadataPayload import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SQLContext} import scala.collection.JavaConverters._ @@ -46,10 +43,6 @@ case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile], maxCompactionMemoryInBytes: Long, mergeType: String) -case class HoodieMergeOnReadTableState(hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit], - recordKeyField: String, - preCombineFieldOpt: Option[String]) - class MergeOnReadSnapshotRelation(sqlContext: SQLContext, optParams: Map[String, String], val userSchema: Option[StructType], @@ -57,38 +50,13 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val metaClient: HoodieTableMetaClient) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { - private val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - private val jobConf = new JobConf(conf) - private val mergeType = optParams.getOrElse( DataSourceReadOptions.REALTIME_MERGE.key, DataSourceReadOptions.REALTIME_MERGE.defaultValue) private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) - // If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one - // NOTE: This is historical behavior which is preserved as is - private val recordKeyField = { - if (metaClient.getTableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD - else metaClient.getTableConfig.getRecordKeyFieldProp - } - - private val preCombineFieldOpt = getPrecombineFieldProperty - - private lazy val mandatoryColumns = { - if (isMetadataTable(metaClient)) { - Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE) - } else { - Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) - } - } - - override def needConversion: Boolean = false - - private val specifiedQueryInstant = optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) - .map(HoodieSqlCommonUtils.formatQueryInstant) - - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}") log.debug(s" buildScan filters = ${filters.mkString(",")}") @@ -137,12 +105,10 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, hadoopConf = new Configuration(conf) ) - val tableState = HoodieMergeOnReadTableState(fileIndex, recordKeyField, preCombineFieldOpt) + val tableState = HoodieTableState(recordKeyField, preCombineFieldOpt) - val rdd = new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, - requiredSchemaParquetReader, tableState, tableSchema, requiredSchema) - - rdd.asInstanceOf[RDD[Row]] + new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, + requiredSchemaParquetReader, tableState, tableSchema, requiredSchema, fileIndex) } def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = { @@ -193,7 +159,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val partitionColumns = hoodieFileIndex.partitionSchema.fieldNames.toSet val partitionFilters = filters.filter(f => f.references.forall(p => partitionColumns.contains(p))) val partitionFilterExpression = - HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema) + HoodieSparkUtils.convertToCatalystExpression(partitionFilters, tableStructSchema) val convertedPartitionFilterExpression = HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilterExpression.toSeq) @@ -231,11 +197,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, } } } - - private def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { - val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) - requestedColumns ++ missing - } } object MergeOnReadSnapshotRelation { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala new file mode 100644 index 0000000000000..1ac8fa098119f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.hudi.HoodieUnsafeRDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.util.MutablePair + +/** + * Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]] + */ +object HoodieUnsafeRDDUtils { + + /** + * Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], returning a properly + * copied [[Array]] of [[InternalRow]]s + */ + def collect(rdd: HoodieUnsafeRDD): Array[InternalRow] = { + rdd.mapPartitionsInternal { iter => + // NOTE: We're leveraging [[MutablePair]] here to avoid unnecessary allocations, since + // a) iteration is performed lazily and b) iteration is single-threaded (w/in partition) + val pair = new MutablePair[InternalRow, Null]() + iter.map(row => pair.update(row.copy(), null)) + } + .map(p => p._1) + .collect() + } + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala similarity index 89% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala index 050efbd3d22c2..4a3a7c4526dee 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.avro import org.apache.avro.Schema import org.apache.spark.sql.types.DataType -class HoodieAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) - extends HoodieAvroSerializerTrait { +class HoodieSparkAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) + extends HoodieAvroSerializer { val avroSerializer = new AvroSerializer(rootCatalystType, rootAvroType, nullable) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala index 9b1b88d34ce18..8aa47ffc2f880 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala @@ -17,11 +17,9 @@ package org.apache.hudi -import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpressions import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpression - -import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith} -import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -93,7 +91,7 @@ class TestConvertFilterToCatalystExpression { } else { expectExpression } - val exp = convertToCatalystExpressions(filters, tableSchema) + val exp = convertToCatalystExpression(filters, tableSchema) if (removeQuotesIfNeed == null) { assertEquals(exp.isEmpty, true) } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala index 315a14c9de03d..18b639f2f9bd2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -26,6 +26,7 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} @@ -39,6 +40,8 @@ import scala.collection.JavaConversions._ @Tag("functional") class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { + private val log = LogManager.getLogger(classOf[TestMORDataSourceStorage]) + val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala new file mode 100644 index 0000000000000..a963081749455 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.avro.Schema +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator} +import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.NonpartitionedKeyGenerator +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.parquet.hadoop.util.counters.BenchmarkCounter +import org.apache.spark.HoodieUnsafeRDDUtils +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Dataset, Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Test} + +import scala.:+ +import scala.collection.JavaConverters._ + +@Tag("functional") +class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with Logging { + + val defaultWriteOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieMetadataConfig.ENABLE.key -> "true", + // NOTE: It's critical that we use non-partitioned table, since the way we track amount of bytes read + // is not robust, and works most reliably only when we read just a single file. As such, making table + // non-partitioned makes it much more likely just a single file will be written + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName + ) + + @Test + def testBaseFileOnlyViewRelation(): Unit = { + val tablePath = s"$basePath/cow" + val targetRecordsCount = 100 + val (_, schema) = bootstrapTable(tablePath, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, targetRecordsCount, + defaultWriteOpts, populateMetaFields = true) + val tableState = TableState(tablePath, schema, targetRecordsCount, 0.0) + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) + val projectedColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 2452), + ("rider,driver", 2552), + ("rider,driver,tip_history", 3517)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 2595), + ("rider,driver", 2735), + ("rider,driver,tip_history", 3750)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Test COW / Snapshot + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, "", projectedColumnsReadStats) + } + + @Test + def testMergeOnReadSnapshotRelationWithDeltaLogs(): Unit = { + val tablePath = s"$basePath/mor-with-logs" + val targetRecordsCount = 100 + val targetUpdatedRecordsRatio = 0.5 + + val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, targetUpdatedRecordsRatio, defaultWriteOpts, populateMetaFields = true) + val tableState = TableState(tablePath, schema, targetRecordsCount, targetUpdatedRecordsRatio) + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) + val projectedColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 2452), + ("rider,driver", 2552), + ("rider,driver,tip_history", 3517)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 2595), + ("rider,driver", 2735), + ("rider,driver,tip_history", 3750)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Stats for the reads fetching _all_ columns (note, how amount of bytes read + // is invariant of the # of columns) + val fullColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 14665), + ("rider,driver", 14665), + ("rider,driver,tip_history", 14665)) + else if (HoodieSparkUtils.isSpark2) + // TODO re-enable tests (these tests are very unstable currently) + Array( + ("rider", -1), + ("rider,driver", -1), + ("rider,driver,tip_history", -1)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Test MOR / Snapshot / Skip-merge + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats) + + // Test MOR / Snapshot / Payload-combine + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats) + + // Test MOR / Read Optimized + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats) + } + + @Test + def testMergeOnReadSnapshotRelationWithNoDeltaLogs(): Unit = { + val tablePath = s"$basePath/mor-no-logs" + val targetRecordsCount = 100 + val targetUpdatedRecordsRatio = 0.0 + + val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, targetUpdatedRecordsRatio, defaultWriteOpts, populateMetaFields = true) + val tableState = TableState(tablePath, schema, targetRecordsCount, targetUpdatedRecordsRatio) + + // + // Test #1: MOR table w/ Delta Logs + // + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) + val projectedColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 2452), + ("rider,driver", 2552), + ("rider,driver,tip_history", 3517)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 2595), + ("rider,driver", 2735), + ("rider,driver,tip_history", 3750)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Test MOR / Snapshot / Skip-merge + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats) + + // Test MOR / Snapshot / Payload-combine + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, projectedColumnsReadStats) + + // Test MOR / Read Optimized + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats) + } + + // TODO add test for incremental query of the table with logs + @Test + def testMergeOnReadIncrementalRelationWithNoDeltaLogs(): Unit = { + val tablePath = s"$basePath/mor-no-logs" + val targetRecordsCount = 100 + val targetUpdatedRecordsRatio = 0.0 + + val opts: Map[String, String] = + // NOTE: Parquet Compression is disabled as it was leading to non-deterministic outcomes when testing + // against Spark 2.x + defaultWriteOpts ++ Seq(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key -> "") + + val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, targetUpdatedRecordsRatio, opts, populateMetaFields = true) + val tableState = TableState(tablePath, schema, targetRecordsCount, targetUpdatedRecordsRatio) + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) + val projectedColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 4219), + ("rider,driver", 4279), + ("rider,driver,tip_history", 5186)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 4430), + ("rider,driver", 4530), + ("rider,driver,tip_history", 5487)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Stats for the reads fetching _all_ columns (note, how amount of bytes read + // is invariant of the # of columns) + val fullColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 19683), + ("rider,driver", 19683), + ("rider,driver,tip_history", 19683)) + else if (HoodieSparkUtils.isSpark2) + // TODO re-enable tests (these tests are very unstable currently) + Array( + ("rider", -1), + ("rider,driver", -1), + ("rider,driver,tip_history", -1)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + val incrementalOpts: Map[String, String] = Map( + DataSourceReadOptions.BEGIN_INSTANTTIME.key -> "001" + ) + + // Test MOR / Incremental / Skip-merge + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, + projectedColumnsReadStats, incrementalOpts) + + // Test MOR / Incremental / Payload-combine + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, + fullColumnsReadStats, incrementalOpts) + } + + + // Test routine + private def runTest(tableState: TableState, + queryType: String, + mergeType: String, + expectedStats: Array[(String, Long)], + additionalOpts: Map[String, String] = Map.empty): Unit = { + val tablePath = tableState.path + val readOpts = defaultWriteOpts ++ Map( + "path" -> tablePath, + DataSourceReadOptions.QUERY_TYPE.key -> queryType, + DataSourceReadOptions.REALTIME_MERGE.key -> mergeType + ) ++ additionalOpts + + val ds = new DefaultSource() + val relation: HoodieBaseRelation = ds.createRelation(spark.sqlContext, readOpts).asInstanceOf[HoodieBaseRelation] + + for ((columnListStr, expectedBytesRead) <- expectedStats) { + val targetColumns = columnListStr.split(",") + + println(s"Running test for $tablePath / $queryType / $mergeType / $columnListStr") + + val (rows, bytesRead) = measureBytesRead { () => + val rdd = relation.buildScan(targetColumns, Array.empty).asInstanceOf[HoodieUnsafeRDD] + HoodieUnsafeRDDUtils.collect(rdd) + } + + val targetRecordCount = tableState.targetRecordCount; + val targetUpdatedRecordsRatio = tableState.targetUpdatedRecordsRatio + + val expectedRecordCount = + if (DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL.equals(mergeType)) targetRecordCount * (1 + targetUpdatedRecordsRatio) + else targetRecordCount + + assertEquals(expectedRecordCount, rows.length) + if (expectedBytesRead != -1) { + assertEquals(expectedBytesRead, bytesRead) + } else { + logWarning(s"Not matching bytes read ($bytesRead)") + } + + val readColumns = targetColumns ++ relation.mandatoryColumns + val (_, projectedStructType) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns) + + val row: InternalRow = rows.take(1).head + + // This check is mostly about making sure InternalRow deserializes properly into projected schema + val deserializedColumns = row.toSeq(projectedStructType) + assertEquals(readColumns.length, deserializedColumns.size) + } + } + + private def bootstrapTable(path: String, + tableType: String, + recordCount: Int, + opts: Map[String, String], + populateMetaFields: Boolean, + dataGenOpt: Option[HoodieTestDataGenerator] = None): (List[HoodieRecord[_]], Schema) = { + val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345)) + + // Bulk Insert Operation + val schema = + if (populateMetaFields) HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS + else HoodieTestDataGenerator.AVRO_SCHEMA + + val records = dataGen.generateInserts("001", recordCount) + val inputDF: Dataset[Row] = toDataset(records, HoodieTestDataGenerator.AVRO_SCHEMA) + + inputDF.write.format("org.apache.hudi") + .options(opts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(path) + + (records.asScala.toList, schema) + } + + private def bootstrapMORTable(path: String, + recordCount: Int, + updatedRecordsRatio: Double, + opts: Map[String, String], + populateMetaFields: Boolean, + dataGenOpt: Option[HoodieTestDataGenerator] = None): (List[HoodieRecord[_]], Schema) = { + val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345)) + + // Step 1: Bootstrap table w/ N records (t/h bulk-insert) + val (insertedRecords, schema) = bootstrapTable(path, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, recordCount, opts, populateMetaFields, Some(dataGen)) + + if (updatedRecordsRatio == 0) { + (insertedRecords, schema) + } else { + val updatesCount = (insertedRecords.length * updatedRecordsRatio).toInt + val recordsToUpdate = insertedRecords.take(updatesCount) + val updatedRecords = dataGen.generateUpdates("002", recordsToUpdate.asJava) + + // Step 2: Update M records out of those (t/h update) + val inputDF = toDataset(updatedRecords, HoodieTestDataGenerator.AVRO_SCHEMA) + + inputDF.write.format("org.apache.hudi") + .options(opts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(path) + + (updatedRecords.asScala.toList ++ insertedRecords.drop(updatesCount), schema) + } + } + + def measureBytesRead[T](f: () => T): (T, Int) = { + // Init BenchmarkCounter to report number of bytes actually read from the Block + BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter, fs.getConf) + val r = f.apply() + val bytesRead = BenchmarkCounter.getBytesRead.toInt + (r, bytesRead) + } + + case class TableState(path: String, schema: Schema, targetRecordCount: Long, targetUpdatedRecordsRatio: Double) +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 5dfa7d9574d9a..fecb01c6a4489 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema import org.apache.hudi.Spark2RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, HoodieAvroSerializerTrait, Spark2HoodieAvroDeserializer, HoodieAvroSerializer} +import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark2AvroDeserializer, HoodieSparkAvroSerializer} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} @@ -42,11 +42,11 @@ import scala.collection.mutable.ArrayBuffer */ class Spark2Adapter extends SparkAdapter { - def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializerTrait = - new HoodieAvroSerializer(rootCatalystType, rootAvroType, nullable) + def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = + new HoodieSparkAvroSerializer(rootCatalystType, rootAvroType, nullable) - def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializerTrait = - new Spark2HoodieAvroDeserializer(rootAvroType, rootCatalystType) + def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = + new HoodieSpark2AvroDeserializer(rootAvroType, rootCatalystType) override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = { new Spark2RowSerDe(encoder) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala similarity index 76% rename from hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala rename to hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala index ac2c82f70dacf..2b55c6695e5b2 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala @@ -21,13 +21,15 @@ import org.apache.avro.Schema import org.apache.spark.sql.types.DataType /** - * This is Spark 2 implementation for the [[HoodieAvroDeserializerTrait]] leveraging [[PatchedAvroDeserializer]], + * This is Spark 2 implementation for the [[HoodieAvroDeserializer]] leveraging [[PatchedAvroDeserializer]], * which is just copied over version of [[AvroDeserializer]] from Spark 2.4.4 w/ SPARK-30267 being back-ported to it */ -class Spark2HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) - extends HoodieAvroDeserializerTrait { +class HoodieSpark2AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) + extends HoodieAvroDeserializer { private val avroDeserializer = new PatchedAvroDeserializer(rootAvroType, rootCatalystType) - def doDeserialize(data: Any): Any = avroDeserializer.deserialize(data) + // As of Spark 3.1, this will return data wrapped with Option, so we make sure these interfaces + // are aligned across Spark versions + def deserialize(data: Any): Option[Any] = Some(avroDeserializer.deserialize(data)) } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala index 8f073bb1cdaaf..5600439410308 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala @@ -21,7 +21,7 @@ import org.apache.avro.Schema import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.spark3.internal.ReflectUtil -import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, HoodieAvroSerializerTrait, Spark3HoodieAvroDeserializer, HoodieAvroSerializer} +import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3AvroDeserializer, HoodieSparkAvroSerializer} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} @@ -43,11 +43,11 @@ import org.apache.spark.sql.{Row, SparkSession} */ class Spark3Adapter extends SparkAdapter { - def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializerTrait = - new HoodieAvroSerializer(rootCatalystType, rootAvroType, nullable) + def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = + new HoodieSparkAvroSerializer(rootCatalystType, rootAvroType, nullable) - def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializerTrait = - new Spark3HoodieAvroDeserializer(rootAvroType, rootCatalystType) + def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = + new HoodieSpark3AvroDeserializer(rootAvroType, rootCatalystType) override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = { new Spark3RowSerDe(encoder) diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala similarity index 89% rename from hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala rename to hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala index fa03f5d841cfb..bd9ead5a70b6d 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala @@ -21,8 +21,8 @@ import org.apache.avro.Schema import org.apache.hudi.HoodieSparkUtils import org.apache.spark.sql.types.DataType -class Spark3HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) - extends HoodieAvroDeserializerTrait { +class HoodieSpark3AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) + extends HoodieAvroDeserializer { // SPARK-34404: As of Spark3.2, there is no AvroDeserializer's constructor with Schema and DataType arguments. // So use the reflection to get AvroDeserializer instance. @@ -34,5 +34,5 @@ class Spark3HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataT constructor.newInstance(rootAvroType, rootCatalystType) } - def doDeserialize(data: Any): Any = avroDeserializer.deserialize(data) + def deserialize(data: Any): Option[Any] = avroDeserializer.deserialize(data) } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java index a57be62461d45..9a62c14e5caa9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java @@ -249,14 +249,17 @@ public List createUpsertRecords(Path srcFolder) throws ParseExcep long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000; List records = new ArrayList(); // 10 for update + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); for (long recordNum = 0; recordNum < 11; recordNum++) { - records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, - "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); + records.add( + dataGen.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, + "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); } // 4 for insert for (long recordNum = 96; recordNum < 100; recordNum++) { - records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, - "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); + records.add( + dataGen.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, + "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); } try (ParquetWriter writer = AvroParquetWriter.builder(srcFile) .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {