Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,33 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
private val sanitizedTableName = AvroSchemaUtils.getAvroRecordQualifiedName(tableName)

/**
* Support batch needs to remain consistent, even if one side of a bootstrap merge can support
* while the other side can't
* Flag saying whether vectorized reading is supported.
*/
private var supportBatchCalled = false
private var supportBatchResult = false
private var supportVectorizedRead = false

/**
* Flag saying whether batch output is supported.
*/
private var supportReturningBatch = false

/**
* Checks if the file format supports vectorized reading, please refer to SPARK-40918.
*
* NOTE: for mor read, even for file-slice with only base file, we can read parquet file with vectorized read,
* but the return result of the whole data-source-scan phase cannot be batch,
* because when there are any log file in a file slice, it needs to be read by the file group reader.
* Since we are currently performing merges based on rows, the result returned by merging should be based on rows,
* we cannot assume that all file slices have only base files.
* So we need to set the batch result back to false.
*
*/
override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
if (!supportBatchCalled || supportBatchResult) {
supportBatchCalled = true
supportBatchResult = !isMOR && !isIncremental && !isBootstrap && super.supportBatch(sparkSession, schema)
}
supportBatchResult
val superSupportBatch = super.supportBatch(sparkSession, schema)
supportVectorizedRead = !isIncremental && !isBootstrap && superSupportBatch
supportReturningBatch = !isMOR && supportVectorizedRead
logInfo(s"supportReturningBatch: $supportReturningBatch, supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, " +
s"isBootstrap: $isBootstrap, superSupportBatch: $superSupportBatch")
supportReturningBatch
}

//for partition columns that we read from the file, we don't want them to be constant column vectors so we
Expand Down Expand Up @@ -154,8 +169,16 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
val requestedSchema = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
val requestedAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, sanitizedTableName)
val dataAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName)
val parquetFileReader = spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult,
val parquetFileReader = spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportVectorizedRead,
spark.sessionState.conf, options, augmentedStorageConf.unwrap()))
val fileGroupParquetFileReader = if (isMOR && supportVectorizedRead) {
// for file group reader to perform read, we always need to read the record without vectorized reader because our merging is based on row level.
// TODO: please consider to support vectorized reader in file group reader
spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(vectorized = false,
spark.sessionState.conf, options, augmentedStorageConf.unwrap()))
} else {
parquetFileReader
}
val broadcastedStorageConf = spark.sparkContext.broadcast(new SerializableConfiguration(augmentedStorageConf.unwrap()))
val fileIndexProps: TypedProperties = HoodieFileIndex.getConfigProperties(spark, options, null)

Expand All @@ -173,7 +196,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || fileSlice.getLogFiles.findAny().isPresent) =>
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
.builder().setConf(storageConf).setBasePath(tablePath).build
val readerContext = new SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters, requiredFilters, storageConf, metaClient.getTableConfig)
val readerContext = new SparkFileFormatInternalRowReaderContext(fileGroupParquetFileReader.value, filters, requiredFilters, storageConf, metaClient.getTableConfig)
val props = metaClient.getTableConfig.getProps
options.foreach(kv => props.setProperty(kv._1, kv._2))
props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), String.valueOf(maxMemoryPerCompaction))
Expand All @@ -200,7 +223,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
}
// CDC queries.
case hoodiePartitionCDCFileGroupSliceMapping: HoodiePartitionCDCFileGroupMapping =>
buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, parquetFileReader.value, storageConf, fileIndexProps, requiredSchema)
buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, fileGroupParquetFileReader.value, storageConf, fileIndexProps, requiredSchema)

case _ =>
readBaseFile(file, parquetFileReader.value, requestedSchema, remainingPartitionSchema, fixedPartitionIndexes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,14 +872,7 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss
upsertData(df2, tempRecordPath, isCow)

// after implicit type change, read the table with vectorized read enabled
//fg reader with mor does not support vectorized currently and will auto read by row
if (isCow || !useFileGroupReader) {
assertThrows(classOf[SparkException]){
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") {
readTable(tempRecordPath, useFileGroupReader)
}
}
} else {
assertThrows(classOf[SparkException]){
withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") {
readTable(tempRecordPath, useFileGroupReader)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,17 @@ object Spark33ParquetReader extends SparkParquetReaderBuilder {
sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong", "false").toBoolean
)

// Should always be set by FileSourceScanExec while creating this.
// Check conf before checking the option, to allow working around an issue by changing conf.
val returningBatch = sqlConf.parquetVectorizedReaderEnabled &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we fix the other version of parquet readers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we fix the other version of parquet readers?

Only fix spark 3.3 is fine, because the relevant changes of spark were integrated in versions after 3.3, only 3.3 needs to be compatible, refer to apache/spark#38397

options.get(FileFormat.OPTION_RETURNING_BATCH)
.getOrElse {
throw new IllegalArgumentException(
"OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " +
"To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")
}
.equals("true")

val parquetOptions = new ParquetOptions(options, sqlConf)
new Spark33ParquetReader(enableVectorizedReader = vectorized,
datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
Expand All @@ -266,7 +277,7 @@ object Spark33ParquetReader extends SparkParquetReaderBuilder {
timestampConversion = sqlConf.isParquetINT96TimestampConversion,
enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
capacity = sqlConf.parquetVectorizedReaderBatchSize,
returningBatch = sqlConf.parquetVectorizedReaderEnabled,
returningBatch = returningBatch,
enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
timeZoneId = Some(sqlConf.sessionLocalTimeZone))
}
Expand Down
Loading