diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala index 4fdf3e4d5a659..2751892017c6c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala @@ -81,18 +81,33 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String, private val sanitizedTableName = AvroSchemaUtils.getAvroRecordQualifiedName(tableName) /** - * Support batch needs to remain consistent, even if one side of a bootstrap merge can support - * while the other side can't + * Flag saying whether vectorized reading is supported. */ - private var supportBatchCalled = false - private var supportBatchResult = false + private var supportVectorizedRead = false + /** + * Flag saying whether batch output is supported. + */ + private var supportReturningBatch = false + + /** + * Checks if the file format supports vectorized reading, please refer to SPARK-40918. + * + * NOTE: for mor read, even for file-slice with only base file, we can read parquet file with vectorized read, + * but the return result of the whole data-source-scan phase cannot be batch, + * because when there are any log file in a file slice, it needs to be read by the file group reader. + * Since we are currently performing merges based on rows, the result returned by merging should be based on rows, + * we cannot assume that all file slices have only base files. + * So we need to set the batch result back to false. + * + */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { - if (!supportBatchCalled || supportBatchResult) { - supportBatchCalled = true - supportBatchResult = !isMOR && !isIncremental && !isBootstrap && super.supportBatch(sparkSession, schema) - } - supportBatchResult + val superSupportBatch = super.supportBatch(sparkSession, schema) + supportVectorizedRead = !isIncremental && !isBootstrap && superSupportBatch + supportReturningBatch = !isMOR && supportVectorizedRead + logInfo(s"supportReturningBatch: $supportReturningBatch, supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, " + + s"isBootstrap: $isBootstrap, superSupportBatch: $superSupportBatch") + supportReturningBatch } //for partition columns that we read from the file, we don't want them to be constant column vectors so we @@ -154,8 +169,16 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String, val requestedSchema = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) val requestedAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, sanitizedTableName) val dataAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName) - val parquetFileReader = spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult, + val parquetFileReader = spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportVectorizedRead, spark.sessionState.conf, options, augmentedStorageConf.unwrap())) + val fileGroupParquetFileReader = if (isMOR && supportVectorizedRead) { + // for file group reader to perform read, we always need to read the record without vectorized reader because our merging is based on row level. + // TODO: please consider to support vectorized reader in file group reader + spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(vectorized = false, + spark.sessionState.conf, options, augmentedStorageConf.unwrap())) + } else { + parquetFileReader + } val broadcastedStorageConf = spark.sparkContext.broadcast(new SerializableConfiguration(augmentedStorageConf.unwrap())) val fileIndexProps: TypedProperties = HoodieFileIndex.getConfigProperties(spark, options, null) @@ -173,7 +196,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String, case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || fileSlice.getLogFiles.findAny().isPresent) => val metaClient: HoodieTableMetaClient = HoodieTableMetaClient .builder().setConf(storageConf).setBasePath(tablePath).build - val readerContext = new SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters, requiredFilters, storageConf, metaClient.getTableConfig) + val readerContext = new SparkFileFormatInternalRowReaderContext(fileGroupParquetFileReader.value, filters, requiredFilters, storageConf, metaClient.getTableConfig) val props = metaClient.getTableConfig.getProps options.foreach(kv => props.setProperty(kv._1, kv._2)) props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), String.valueOf(maxMemoryPerCompaction)) @@ -200,7 +223,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String, } // CDC queries. case hoodiePartitionCDCFileGroupSliceMapping: HoodiePartitionCDCFileGroupMapping => - buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, parquetFileReader.value, storageConf, fileIndexProps, requiredSchema) + buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, fileGroupParquetFileReader.value, storageConf, fileIndexProps, requiredSchema) case _ => readBaseFile(file, parquetFileReader.value, requestedSchema, remainingPartitionSchema, fixedPartitionIndexes, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala index 8eb4c0aee85b0..2929534ffa745 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala @@ -872,14 +872,7 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAss upsertData(df2, tempRecordPath, isCow) // after implicit type change, read the table with vectorized read enabled - //fg reader with mor does not support vectorized currently and will auto read by row - if (isCow || !useFileGroupReader) { - assertThrows(classOf[SparkException]){ - withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") { - readTable(tempRecordPath, useFileGroupReader) - } - } - } else { + assertThrows(classOf[SparkException]){ withSQLConf("spark.sql.parquet.enableNestedColumnVectorizedReader" -> "true") { readTable(tempRecordPath, useFileGroupReader) } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala index a4d5adda191a0..1f49da7352fa2 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala @@ -252,6 +252,17 @@ object Spark33ParquetReader extends SparkParquetReaderBuilder { sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong", "false").toBoolean ) + // Should always be set by FileSourceScanExec while creating this. + // Check conf before checking the option, to allow working around an issue by changing conf. + val returningBatch = sqlConf.parquetVectorizedReaderEnabled && + options.get(FileFormat.OPTION_RETURNING_BATCH) + .getOrElse { + throw new IllegalArgumentException( + "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + + "To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.") + } + .equals("true") + val parquetOptions = new ParquetOptions(options, sqlConf) new Spark33ParquetReader(enableVectorizedReader = vectorized, datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead, @@ -266,7 +277,7 @@ object Spark33ParquetReader extends SparkParquetReaderBuilder { timestampConversion = sqlConf.isParquetINT96TimestampConversion, enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled, capacity = sqlConf.parquetVectorizedReaderBatchSize, - returningBatch = sqlConf.parquetVectorizedReaderEnabled, + returningBatch = returningBatch, enableRecordFilter = sqlConf.parquetRecordFilterEnabled, timeZoneId = Some(sqlConf.sessionLocalTimeZone)) }