diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala index fe30f61b9298..a3b9c210b983 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala @@ -18,12 +18,30 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan} import org.apache.spark.sql.types.StructType trait HoodieCatalystExpressionUtils { + /** + * Generates instance of [[UnsafeProjection]] projecting row of one [[StructType]] into another [[StructType]] + * + * NOTE: No safety checks are executed to validate that this projection is actually feasible, + * it's up to the caller to make sure that such projection is possible. + * + * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if + * B is a subset of A + */ + def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = { + val attrs = from.toAttributes + val attrsMap = attrs.map(attr => (attr.name, attr)).toMap + val targetExprs = to.fields.map(f => attrsMap(f.name)) + + GenerateUnsafeProjection.generate(targetExprs, attrs) + } + /** * Parses and resolves expression against the attributes of the given table schema. * diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index d8ed17354785..a97743e62fac 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -177,7 +177,7 @@ trait SparkAdapter extends Serializable { def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] /** - * Create hoodie parquet file format. + * Create instance of [[ParquetFileFormat]] */ - def createHoodieParquetFileFormat(): Option[ParquetFileFormat] + def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java new file mode 100644 index 000000000000..dd14dca671b3 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +public class AvroSchemaUtils { + + private AvroSchemaUtils() {} + + /** + * Appends provided new fields at the end of the given schema + * + * NOTE: No deduplication is made, this method simply appends fields at the end of the list + * of the source schema as is + */ + public static Schema appendFieldsToSchema(Schema schema, List newFields) { + List fields = schema.getFields().stream() + .map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())) + .collect(Collectors.toList()); + fields.addAll(newFields); + + Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()); + newSchema.setFields(fields); + return newSchema; + } + + /** + * Passed in {@code Union} schema and will try to resolve the field with the {@code fieldSchemaFullName} + * w/in the union returning its corresponding schema + * + * @param schema target schema to be inspected + * @param fieldSchemaFullName target field-name to be looked up w/in the union + * @return schema of the field w/in the union identified by the {@code fieldSchemaFullName} + */ + public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullName) { + if (schema.getType() != Schema.Type.UNION) { + return schema; + } + + List innerTypes = schema.getTypes(); + Schema nonNullType = + innerTypes.stream() + .filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName)) + .findFirst() + .orElse(null); + + if (nonNullType == null) { + throw new AvroRuntimeException( + String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); + } + + return nonNullType; + } + + /** + * Resolves typical Avro's nullable schema definition: {@code Union(Schema.Type.NULL, )}, + * decomposing union and returning the target non-null type + */ + public static Schema resolveNullableSchema(Schema schema) { + if (schema.getType() != Schema.Type.UNION) { + return schema; + } + + List innerTypes = schema.getTypes(); + Schema nonNullType = + innerTypes.stream() + .filter(it -> it.getType() != Schema.Type.NULL) + .findFirst() + .orElse(null); + + if (innerTypes.size() != 2 || nonNullType == null) { + throw new AvroRuntimeException( + String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); + } + + return nonNullType; + } + + /** + * Creates schema following Avro's typical nullable schema definition: {@code Union(Schema.Type.NULL, )}, + * wrapping around provided target non-null type + */ + public static Schema createNullableSchema(Schema.Type avroType) { + checkState(avroType != Schema.Type.NULL); + return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType)); + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 9367e23dc64a..bf540a302e71 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -19,7 +19,6 @@ package org.apache.hudi.avro; import org.apache.avro.AvroRuntimeException; -import org.apache.avro.SchemaCompatibility; import org.apache.avro.Conversions; import org.apache.avro.Conversions.DecimalConversion; import org.apache.avro.JsonProperties; @@ -27,6 +26,7 @@ import org.apache.avro.LogicalTypes.Decimal; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; +import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; @@ -64,19 +64,19 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.HashMap; import java.util.TimeZone; -import java.util.Iterator; - import java.util.stream.Collectors; import static org.apache.avro.Schema.Type.UNION; +import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; +import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema; +import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema; /** * Helper class to do common stuff across Avro. @@ -97,8 +97,7 @@ public class HoodieAvroUtils { private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__"; // All metadata fields are optional strings. - public static final Schema METADATA_FIELD_SCHEMA = - Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))); + public static final Schema METADATA_FIELD_SCHEMA = createNullableSchema(Schema.Type.STRING); public static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema(); @@ -327,31 +326,6 @@ public static GenericRecord addOperationToRecord(GenericRecord record, HoodieOpe return record; } - /** - * Add null fields to passed in schema. Caller is responsible for ensuring there is no duplicates. As different query - * engines have varying constraints regarding treating the case-sensitivity of fields, its best to let caller - * determine that. - * - * @param schema Passed in schema - * @param newFieldNames Null Field names to be added - */ - public static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { - List newFields = new ArrayList<>(); - for (String newField : newFieldNames) { - newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE)); - } - return createNewSchemaWithExtraFields(schema, newFields); - } - - public static Schema createNewSchemaWithExtraFields(Schema schema, List newFields) { - List fields = schema.getFields().stream() - .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultVal())).collect(Collectors.toList()); - fields.addAll(newFields); - Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()); - newSchema.setFields(fields); - return newSchema; - } - /** * Adds the Hoodie commit metadata into the provided Generic Record. */ @@ -736,46 +710,6 @@ public static Object getRecordColumnValues(HoodieRecord innerTypes = schema.getTypes(); - Schema nonNullType = - innerTypes.stream() - .filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName)) - .findFirst() - .orElse(null); - - if (nonNullType == null) { - throw new AvroRuntimeException( - String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); - } - - return nonNullType; - } - - public static Schema resolveNullableSchema(Schema schema) { - if (schema.getType() != Schema.Type.UNION) { - return schema; - } - - List innerTypes = schema.getTypes(); - Schema nonNullType = - innerTypes.stream() - .filter(it -> it.getType() != Schema.Type.NULL) - .findFirst() - .orElse(null); - - if (innerTypes.size() != 2 || nonNullType == null) { - throw new AvroRuntimeException( - String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); - } - - return nonNullType; - } - /** * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema. * support deep rewrite for nested record. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java index 93e9ea5d3433..89bad1c33f59 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java @@ -213,7 +213,7 @@ private boolean readTextFormatMetaFile() { format = Option.empty(); return true; } catch (Throwable t) { - LOG.warn("Unable to read partition meta properties file for partition " + partitionPath, t); + LOG.debug("Unable to read partition meta properties file for partition " + partitionPath); return false; } } @@ -229,8 +229,7 @@ private boolean readBaseFormatMetaFile() { format = Option.of(reader.getFormat()); return true; } catch (Throwable t) { - // any error, log, check the next base format - LOG.warn("Unable to read partition metadata " + metafilePath.getName() + " for partition " + partitionPath, t); + LOG.debug("Unable to read partition metadata " + metafilePath.getName() + " for partition " + partitionPath); } } return false; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 254044bd2837..bbc508bd5f9c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -607,7 +607,7 @@ public String getUrlEncodePartitioning() { return getString(URL_ENCODE_PARTITIONING); } - public Boolean isDropPartitionColumns() { + public Boolean shouldDropPartitionColumns() { return getBooleanOrDefault(DROP_PARTITION_COLUMNS); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 262157a8aefa..f178a23eeec7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -23,11 +23,9 @@ import org.apache.avro.Schema.Field; import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.IndexedRecord; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.hfile.CacheConfig; - import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -47,15 +45,13 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIncompatibleSchemaException; import org.apache.hudi.exception.InvalidTableException; -import org.apache.hudi.io.storage.HoodieHFileReader; -import org.apache.hudi.io.storage.HoodieOrcReader; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager; import org.apache.hudi.internal.schema.utils.SerDeHelper; - +import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.hudi.io.storage.HoodieOrcReader; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; - import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; @@ -67,6 +63,9 @@ import java.util.Arrays; import java.util.List; +import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema; +import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; + /** * Helper class to read schema from data files and log files and to convert it between different formats. * @@ -189,7 +188,7 @@ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception } Option partitionFieldsOpt = metaClient.getTableConfig().getPartitionFields(); - if (metaClient.getTableConfig().isDropPartitionColumns()) { + if (metaClient.getTableConfig().shouldDropPartitionColumns()) { schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, schema); } return schema; @@ -222,9 +221,9 @@ public static Schema recreateSchemaWhenDropPartitionColumns(Option par List newFields = new ArrayList<>(); for (String partitionField: partitionFields) { newFields.add(new Schema.Field( - partitionField, Schema.create(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); + partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); } - schema = HoodieAvroUtils.createNewSchemaWithExtraFields(schema, newFields); + schema = appendFieldsToSchema(schema, newFields); } } return schema; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 3904ff6f832c..c0e97f33091d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -89,10 +89,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema; import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes; import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema; -import static org.apache.hudi.avro.HoodieAvroUtils.resolveNullableSchema; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java index 59a24a79f013..e0e57e812b8a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java @@ -20,6 +20,7 @@ import org.apache.avro.Schema; +import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; @@ -57,7 +58,7 @@ public void testRecreateSchemaWhenDropPartitionColumns() { assertNotEquals(originSchema, s4); assertTrue(s4.getFields().stream().anyMatch(f -> f.name().equals("user_partition"))); Schema.Field f = s4.getField("user_partition"); - assertEquals(f.schema().getType().getName(), "string"); + assertEquals(f.schema(), AvroSchemaUtils.createNullableSchema(Schema.Type.STRING)); // case5: user_partition is in originSchema, but partition_path is in originSchema String[] pts4 = {"user_partition", "partition_path"}; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 0aa74ef15433..0e4f9c304cb2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -18,13 +18,7 @@ package org.apache.hudi.hadoop.utils; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.JsonProperties; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; @@ -32,8 +26,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; @@ -46,6 +40,12 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -60,6 +60,9 @@ import java.util.TreeMap; import java.util.stream.Collectors; +import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema; +import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; + public class HoodieRealtimeRecordReaderUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class); @@ -287,6 +290,14 @@ public static Schema addPartitionFields(Schema schema, List partitioning List fieldsToAdd = partitioningFields.stream().map(String::toLowerCase) .filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList()); - return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd); + return appendNullSchemaFields(schema, fieldsToAdd); + } + + private static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { + List newFields = new ArrayList<>(); + for (String newField : newFieldNames) { + newFields.add(new Schema.Field(newField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); + } + return appendFieldsToSchema(schema, newFields); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index cc8fb0492aff..556b0feef1cd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -17,4 +17,4 @@ org.apache.hudi.DefaultSource -org.apache.spark.sql.execution.datasources.parquet.SparkHoodieParquetFileFormat \ No newline at end of file +org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index adf94fffdeba..5414a228c731 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -20,14 +20,13 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat} import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -56,6 +55,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, override type FileSplit = HoodieBaseFileSplit override lazy val mandatoryColumns: Seq[String] = + // TODO reconcile, record's key shouldn't be mandatory for base-file only relation Seq(recordKeyField) override def imbueConfigs(sqlContext: SQLContext): Unit = { @@ -65,14 +65,14 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit], partitionSchema: StructType, - tableSchema: HoodieTableSchema, + dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, filters: Array[Filter]): HoodieUnsafeRDD = { val baseFileReader = createBaseFileReader( spark = sparkSession, partitionSchema = partitionSchema, - tableSchema = tableSchema, + dataSchema = dataSchema, requiredSchema = requiredSchema, filters = filters, options = optParams, @@ -114,16 +114,38 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, * rule; you can find more details in HUDI-3896) */ def toHadoopFsRelation: HadoopFsRelation = { + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + val shouldAppendPartitionColumns = shouldOmitPartitionColumns + val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match { - case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet") + case HoodieFileFormat.PARQUET => + (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, HoodieParquetFileFormat.FILE_FORMAT_ID) case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") } if (globPaths.isEmpty) { + // NOTE: There are currently 2 ways partition values could be fetched: + // - Source columns (producing the values used for physical partitioning) will be read + // from the data file + // - Values parsed from the actual partition pat would be appended to the final dataset + // + // In the former case, we don't need to provide the partition-schema to the relation, + // therefore we simply stub it w/ empty schema and use full table-schema as the one being + // read from the data file. + // + // In the latter, we have to specify proper partition schema as well as "data"-schema, essentially + // being a table-schema with all partition columns stripped out + val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) { + (fileIndex.partitionSchema, fileIndex.dataSchema) + } else { + (StructType(Nil), tableStructSchema) + } + HadoopFsRelation( location = fileIndex, - partitionSchema = fileIndex.partitionSchema, - dataSchema = fileIndex.dataSchema, + partitionSchema = partitionSchema, + dataSchema = dataSchema, bucketSpec = None, fileFormat = tableFileFormat, optParams)(sparkSession) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 53667f3b88ac..2fd1da595036 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.getPartitionPath +import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} import org.apache.hudi.common.fs.FSUtils @@ -36,12 +36,13 @@ import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.io.storage.HoodieHFileReader -import org.apache.spark.TaskContext import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.HoodieAvroSchemaConverters import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils @@ -50,11 +51,11 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext, SparkSession} import org.apache.spark.unsafe.types.UTF8String -import java.io.Closeable import java.net.URI +import java.util.Locale import scala.collection.JavaConverters._ -import scala.util.Try import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} trait HoodieFileSplit {} @@ -78,7 +79,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, extends BaseRelation with FileRelation with PrunedFilteredScan - with SparkAdapterSupport with Logging { type FileSplit <: HoodieFileSplit @@ -125,14 +125,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = { val schemaUtil = new TableSchemaResolver(metaClient) - val avroSchema = Try(schemaUtil.getTableAvroSchema).getOrElse( - // If there is no commit in the table, we can't get the schema - // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead. - userSchema match { - case Some(s) => sparkAdapter.getAvroSchemaConverters.toAvroType(s, nullable = false, "record") - case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty") - } - ) + val avroSchema = Try(schemaUtil.getTableAvroSchema) match { + case Success(schema) => schema + case Failure(e) => + logWarning("Failed to fetch schema from the table", e) + // If there is no commit in the table, we can't get the schema + // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead. + userSchema match { + case Some(s) => convertToAvroSchema(s) + case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty") + } + } // try to find internalSchema val internalSchemaFromMeta = try { schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema) @@ -146,11 +149,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) - /** - * if true, need to deal with schema for creating file reader. - */ - protected val dropPartitionColumnsWhenWrite: Boolean = - metaClient.getTableConfig.isDropPartitionColumns && partitionColumns.nonEmpty + protected val shouldOmitPartitionColumns: Boolean = + metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty /** * NOTE: PLEASE READ THIS CAREFULLY @@ -205,14 +205,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: DO NOT OVERRIDE THIS METHOD */ override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - // NOTE: In case list of requested columns doesn't contain the Primary Key one, we + // NOTE: PLEAS READ CAREFULLY BEFORE MAKING CHANGES + // + // In case list of requested columns doesn't contain the Primary Key one, we // have to add it explicitly so that // - Merging could be performed correctly // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], - // Spark still fetches all the rows to execute the query correctly + // Spark still fetches all the rows to execute the query correctly // - // It's okay to return columns that have not been requested by the caller, as those nevertheless will be - // filtered out upstream + // *Appending* additional columns to the ones requested by the caller is not a problem, as those + // will be "projected out" by the caller's projection; + // + // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM + // PROJECTION val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = @@ -223,56 +228,62 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val fileSplits = collectFileSplits(partitionFilters, dataFilters) - val partitionSchema = if (dropPartitionColumnsWhenWrite) { - // when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in - // data files. - StructType(partitionColumns.map(StructField(_, StringType))) - } else { - StructType(Nil) - } - val tableSchema = HoodieTableSchema(tableStructSchema, if (internalSchema.isEmptySchema) tableAvroSchema.toString else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString, internalSchema) - val dataSchema = if (dropPartitionColumnsWhenWrite) { - val dataStructType = StructType(tableStructSchema.filterNot(f => partitionColumns.contains(f.name))) - HoodieTableSchema( - dataStructType, - sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, nullable = false, "record").toString() - ) - } else { - tableSchema - } - val requiredSchema = if (dropPartitionColumnsWhenWrite) { - val requiredStructType = StructType(requiredStructSchema.filterNot(f => partitionColumns.contains(f.name))) - HoodieTableSchema( - requiredStructType, - sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType, nullable = false, "record").toString() - ) + val tableAvroSchemaStr = + if (internalSchema.isEmptySchema) tableAvroSchema.toString + else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString + + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchema) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema) + + // Since schema requested by the caller might contain partition columns, we might need to + // prune it, removing all partition columns from it in case these columns are not persisted + // in the data files + // + // NOTE: This partition schema is only relevant to file reader to be able to embed + // values of partition columns (hereafter referred to as partition values) encoded into + // the partition path, and omitted from the data file, back into fetched rows; + // Note that, by default, partition columns are not omitted therefore specifying + // partition schema for reader is not required + val (partitionSchema, dataSchema, prunedRequiredSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) + + if (fileSplits.isEmpty) { + sparkSession.sparkContext.emptyRDD } else { - HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, requiredInternalSchema) + val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, prunedRequiredSchema, filters) + + // NOTE: In case when partition columns have been pruned from the required schema, we have to project + // the rows from the pruned schema back into the one expected by the caller + val projectedRDD = if (prunedRequiredSchema.structTypeSchema != requiredSchema.structTypeSchema) { + rdd.mapPartitions { it => + val fullPrunedSchema = StructType(prunedRequiredSchema.structTypeSchema.fields ++ partitionSchema.fields) + val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema) + it.map(unsafeProjection) + } + } else { + rdd + } + + // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] + // Please check [[needConversion]] scala-doc for more details + projectedRDD.asInstanceOf[RDD[Row]] } - // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] - // Please check [[needConversion]] scala-doc for more details - if (fileSplits.nonEmpty) - composeRDD(fileSplits, partitionSchema, dataSchema, requiredSchema, filters).asInstanceOf[RDD[Row]] - else - sparkSession.sparkContext.emptyRDD } - - /** * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied * * @param fileSplits file splits to be handled by the RDD * @param partitionSchema target table's partition schema - * @param tableSchema target table's schema + * @param dataSchema target table's data files' schema * @param requiredSchema projected schema required by the reader * @param filters data filters to be applied * @return instance of RDD (implementing [[HoodieUnsafeRDD]]) */ protected def composeRDD(fileSplits: Seq[FileSplit], partitionSchema: StructType, - tableSchema: HoodieTableSchema, + dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, filters: Array[Filter]): HoodieUnsafeRDD @@ -325,16 +336,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { - if (dropPartitionColumnsWhenWrite) { - if (requestedColumns.isEmpty) { - mandatoryColumns.toArray - } else { - requestedColumns - } - } else { - val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) - requestedColumns ++ missing - } + val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) + requestedColumns ++ missing } protected def getTableState: HoodieTableState = { @@ -364,7 +367,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = { try { val tableConfig = metaClient.getTableConfig - if (dropPartitionColumnsWhenWrite) { + if (shouldOmitPartitionColumns) { val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean if (hiveStylePartitioningEnabled) { @@ -388,40 +391,47 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, InternalRow.empty } } -} -object HoodieBaseRelation { - - def getPartitionPath(fileStatus: FileStatus): Path = - fileStatus.getPath.getParent + protected def getColName(f: StructField): String = { + if (sparkSession.sessionState.conf.caseSensitiveAnalysis) { + f.name + } else { + f.name.toLowerCase(Locale.ROOT) + } + } /** * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]] * over [[InternalRow]] */ - def createBaseFileReader(spark: SparkSession, - partitionSchema: StructType, - tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + protected def createBaseFileReader(spark: SparkSession, + partitionSchema: StructType, + dataSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { val hfileReader = createHFileReader( spark = spark, - tableSchema = tableSchema, + dataSchema = dataSchema, requiredSchema = requiredSchema, filters = filters, options = options, hadoopConf = hadoopConf ) + + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + val shouldAppendPartitionColumns = shouldOmitPartitionColumns val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = spark, - dataSchema = tableSchema.structTypeSchema, + dataSchema = dataSchema.structTypeSchema, partitionSchema = partitionSchema, requiredSchema = requiredSchema.structTypeSchema, filters = filters, options = options, - hadoopConf = hadoopConf + hadoopConf = hadoopConf, + appendPartitionValues = shouldAppendPartitionColumns ) partitionedFile => { @@ -436,8 +446,38 @@ object HoodieBaseRelation { } } + private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { + if (shouldOmitPartitionColumns) { + val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType))) + val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema) + val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema) + + (partitionSchema, + HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema).toString), + HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema).toString)) + } else { + (StructType(Nil), tableSchema, requiredSchema) + } + } + + private def prunePartitionColumns(dataStructSchema: StructType): StructType = + StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) +} + +object HoodieBaseRelation extends SparkAdapterSupport { + + private def generateUnsafeProjection(from: StructType, to: StructType) = + sparkAdapter.createCatalystExpressionUtils().generateUnsafeProjection(from, to) + + def convertToAvroSchema(structSchema: StructType): Schema = + sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record") + + def getPartitionPath(fileStatus: FileStatus): Path = + fileStatus.getPath.getParent + private def createHFileReader(spark: SparkSession, - tableSchema: HoodieTableSchema, + dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index 02264bc4a62f..1fc9e70a5a52 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -21,6 +21,7 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus import org.apache.hudi.client.utils.SparkInternalSchemaConverter +import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.utils.SerDeHelper import org.apache.spark.sql.SparkSession @@ -38,8 +39,8 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { /** - * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]] - * to deal with [[ColumnarBatch]] when enable parquet vectorized reader if necessary. + * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] handling [[ColumnarBatch]], + * when Parquet's Vectorized Reader is used */ def buildHoodieParquetReader(sparkSession: SparkSession, dataSchema: StructType, @@ -47,9 +48,11 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], - hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + hadoopConf: Configuration, + appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { - val readParquetFile: PartitionedFile => Iterator[Any] = sparkAdapter.createHoodieParquetFileFormat().get.buildReaderWithPartitionValues( + val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get + val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues( sparkSession = sparkSession, dataSchema = dataSchema, partitionSchema = partitionSchema, @@ -91,9 +94,12 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { * @param validCommits valid commits, using give validCommits to validate all legal histroy Schema files, and return the latest one. */ def getConfigurationWithInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = { - conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) - conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath) - conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) + val querySchemaString = SerDeHelper.toJson(internalSchema) + if (!isNullOrEmpty(querySchemaString)) { + conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) + conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, tablePath) + conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) + } conf } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index c86b1615ba58..38062aa80209 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -88,7 +88,7 @@ object HoodieSparkSqlWriter { val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters) - val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestmapBasedKeyGenerator( + val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator( originKeyGeneratorClassName, parameters) //validate datasource and tableconfig keygen are the same validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig); @@ -758,7 +758,7 @@ object HoodieSparkSqlWriter { (params, HoodieWriterUtils.convertMapToHoodieConfig(params)) } - private def extractConfigsRelatedToTimestmapBasedKeyGenerator(keyGenerator: String, + private def extractConfigsRelatedToTimestampBasedKeyGenerator(keyGenerator: String, params: Map[String, String]): Map[String, String] = { if (keyGenerator.equals(classOf[TimestampBasedKeyGenerator].getCanonicalName) || keyGenerator.equals(classOf[TimestampBasedAvroKeyGenerator].getCanonicalName)) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 039dafb596d8..d9d5812adbe2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -20,8 +20,8 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat, HoodieRecord, HoodieReplaceCommitMetadata} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import java.util.stream.Collectors +import java.util.stream.Collectors import org.apache.hadoop.fs.{GlobPattern, Path} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.utils.SparkInternalSchemaConverter @@ -36,6 +36,7 @@ import org.apache.hudi.table.HoodieSparkTable import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat import org.apache.spark.sql.sources.{BaseRelation, TableScan} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Row, SQLContext} @@ -183,7 +184,7 @@ class IncrementalRelation(val sqlContext: SQLContext, sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath) sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) val formatClassName = metaClient.getTableConfig.getBaseFileFormat match { - case HoodieFileFormat.PARQUET => if (!internalSchema.isEmptySchema) "HoodieParquet" else "parquet" + case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID case HoodieFileFormat.ORC => "orc" } sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 46e395fc2bfe..6aa7007851d2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -19,9 +19,7 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{GlobPattern, Path} -import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.HoodieConversionUtils.toScalaOption -import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} @@ -61,14 +59,14 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], partitionSchema: StructType, - tableSchema: HoodieTableSchema, + dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, filters: Array[Filter]): HoodieMergeOnReadRDD = { val fullSchemaParquetReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, - tableSchema = tableSchema, - requiredSchema = tableSchema, + dataSchema = dataSchema, + requiredSchema = dataSchema, // This file-reader is used to read base file records, subsequently merging them with the records // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding // applying any user-defined filtering _before_ we complete combining them w/ delta-log records (to make sure that @@ -86,7 +84,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, val requiredSchemaParquetReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, - tableSchema = tableSchema, + dataSchema = dataSchema, requiredSchema = requiredSchema, filters = filters ++ incrementalSpanRecordFilters, options = optParams, @@ -99,7 +97,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately // filtered, since file-reader might not be capable to perform filtering new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader, - tableSchema, requiredSchema, hoodieTableState, mergeType, fileSplits) + dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits) } override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index d85788e25b30..a88eb63036db 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -20,17 +20,14 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView -import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.sources.Filter @@ -63,14 +60,14 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], partitionSchema: StructType, - tableSchema: HoodieTableSchema, + dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, filters: Array[Filter]): HoodieMergeOnReadRDD = { val fullSchemaParquetReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, - tableSchema = tableSchema, - requiredSchema = tableSchema, + dataSchema = dataSchema, + requiredSchema = dataSchema, // This file-reader is used to read base file records, subsequently merging them with the records // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that @@ -85,7 +82,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val requiredSchemaParquetReader = createBaseFileReader( spark = sqlContext.sparkSession, partitionSchema = partitionSchema, - tableSchema = tableSchema, + dataSchema = dataSchema, requiredSchema = requiredSchema, filters = filters, options = optParams, @@ -96,7 +93,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val tableState = getTableState new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader, - tableSchema, requiredSchema, tableState, mergeType, fileSplits) + dataSchema, requiredSchema, tableState, mergeType, fileSplits) } protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 1305323bd1a2..cd1c1fb4affc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -120,6 +120,9 @@ class SparkHoodieTableFileIndex(spark: SparkSession, StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name))) } + /** + * @VisibleForTesting + */ def partitionSchema: StructType = { if (queryAsNonePartitionedTable) { // If we read it as Non-Partitioned table, we should not diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala similarity index 58% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala index 150178ea6906..dbb62d089ece 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala @@ -23,26 +23,32 @@ import org.apache.hudi.SparkAdapterSupport import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat.FILE_FORMAT_ID import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -class SparkHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport { - override def shortName(): String = "HoodieParquet" +class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport { - override def toString: String = "HoodieParquet" + override def shortName(): String = FILE_FORMAT_ID - override def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + override def toString: String = "Hoodie-Parquet" + + override def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { sparkAdapter - .createHoodieParquetFileFormat().get + .createHoodieParquetFileFormat(appendPartitionValues = false).get .buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) } } +object HoodieParquetFileFormat { + + val FILE_FORMAT_ID = "hoodie-parquet" + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index b232ef010f53..28a6dcdcd60c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -747,7 +747,8 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(resultSchema, schema1) } - @ParameterizedTest @ValueSource(booleans = Array(true, false)) + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns: Boolean) { val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 100)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) @@ -897,9 +898,9 @@ class TestCOWDataSource extends HoodieClientTestBase { readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(",")) } - @Disabled("HUDI-3204") - @Test - def testHoodieBaseFileOnlyViewRelation(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testHoodieBaseFileOnlyViewRelation(useGlobbing: Boolean): Unit = { val _spark = spark import _spark.implicits._ @@ -925,18 +926,27 @@ class TestCOWDataSource extends HoodieClientTestBase { .mode(org.apache.spark.sql.SaveMode.Append) .save(basePath) - val res = spark.read.format("hudi").load(basePath) + // NOTE: We're testing here that both paths are appropriately handling + // partition values, regardless of whether we're reading the table + // t/h a globbed path or not + val path = if (useGlobbing) { + s"$basePath/*/*/*/*" + } else { + basePath + } + + val res = spark.read.format("hudi").load(path) assert(res.count() == 2) // data_date is the partition field. Persist to the parquet file using the origin values, and read it. assertEquals( - res.select("data_date").map(_.get(0).toString).collect().sorted, - Array("2018-09-23", "2018-09-24") + res.select("data_date").map(_.get(0).toString).collect().sorted.toSeq, + Seq("2018-09-23", "2018-09-24") ) assertEquals( - res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted, - Array("2018/09/23", "2018/09/24") + res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq, + Seq("2018/09/23", "2018/09/24") ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index bd7edd4db57b..48bb46f81b1b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -57,7 +57,6 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val verificationCol: String = "driver" val updatedVerificationVal: String = "driver_update" - @Disabled("HUDI-3896") @ParameterizedTest @CsvSource(Array( "true,org.apache.hudi.keygen.SimpleKeyGenerator", diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index e4b3c4010a5e..0e74c997d7ee 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser @@ -165,7 +165,7 @@ class Spark2Adapter extends SparkAdapter { } } - override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = { - Some(new ParquetFileFormat) + override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { + Some(new Spark24HoodieParquetFileFormat(appendPartitionValues)) } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala new file mode 100644 index 000000000000..6fb5c50c03a2 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader} +import org.apache.spark.TaskContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.AvroDeserializer +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.util.SerializableConfiguration + +import java.net.URI + +/** + * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior + * that's not possible to customize in any other way + * + * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 2.4.4 w/ w/ the following changes applied to it: + *
    + *
  1. Avoiding appending partition values to the rows read from the data file
  2. + *
+ */ +class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { + + override def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set( + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + requiredSchema.json) + hadoopConf.set( + ParquetWriteSupport.SPARK_ROW_SCHEMA, + requiredSchema.json) + hadoopConf.set( + SQLConf.SESSION_LOCAL_TIMEZONE.key, + sparkSession.sessionState.conf.sessionLocalTimeZone) + hadoopConf.setBoolean( + SQLConf.CASE_SENSITIVE.key, + sparkSession.sessionState.conf.caseSensitiveAnalysis) + + ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) + + // Sets flags for `ParquetToSparkSchemaConverter` + hadoopConf.setBoolean( + SQLConf.PARQUET_BINARY_AS_STRING.key, + sparkSession.sessionState.conf.isParquetBinaryAsString) + hadoopConf.setBoolean( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + // TODO: if you move this into the closure it reverts to the default values. + // If true, enable using the custom RecordReader for parquet. This only works for + // a subset of the types (no complex types). + val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) + val sqlConf = sparkSession.sessionState.conf + val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled + val enableVectorizedReader: Boolean = + sqlConf.parquetVectorizedReaderEnabled && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled + val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion + val capacity = sqlConf.parquetVectorizedReaderBatchSize + val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown + // Whole stage codegen (PhysicalRDD) is able to deal with batches directly + val returningBatch = supportBatch(sparkSession, resultSchema) + val pushDownDate = sqlConf.parquetFilterPushDownDate + val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp + val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal + val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + val isCaseSensitive = sqlConf.caseSensitiveAnalysis + + (file: PartitionedFile) => { + assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) + + val fileSplit = + new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty) + val filePath = fileSplit.getPath + + val split = + new org.apache.parquet.hadoop.ParquetInputSplit( + filePath, + fileSplit.getStart, + fileSplit.getStart + fileSplit.getLength, + fileSplit.getLength, + fileSplit.getLocations, + null) + + val sharedConf = broadcastedHadoopConf.value.value + + lazy val footerFileMetaData = + ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, + pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter(parquetSchema, _)) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) + val iter = new RecordReaderIterator(vectorizedReader) + // SPARK-23457 Register a task completion lister before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + vectorizedReader.initialize(split, hadoopAttemptContext) + logDebug(s"Appending $partitionSchema ${file.partitionValues}") + + // NOTE: We're making appending of the partitioned values to the rows read from the + // data file configurable + if (shouldAppendPartitionValues) { + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + } else { + vectorizedReader.initBatch(StructType(Nil), InternalRow.empty) + } + + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } else { + logDebug(s"Falling back to parquet-mr") + // ParquetRecordReader returns UnsafeRow + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter) + } else { + new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz)) + } + val iter = new RecordReaderIterator(reader) + // SPARK-23457 Register a task completion lister before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val joinedRow = new JoinedRow() + val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + // This is a horrible erasure hack... if we type the iterator above, then it actually check + // the type in next() and we get a class cast exception. If we make that function return + // Object, then we can defer the cast until later! + // + // NOTE: We're making appending of the partitioned values to the rows read from the + // data file configurable + if (!shouldAppendPartitionValues || partitionSchema.length == 0) { + // There is no partition columns + iter.asInstanceOf[Iterator[InternalRow]] + } else { + iter.asInstanceOf[Iterator[InternalRow]] + .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) + } + + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index 13dba8248827..cd5cd9c82fbe 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -19,14 +19,13 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema -import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer, HoodieSparkAvroSchemaConverters} -import org.apache.spark.sql.hudi.SparkAdapter -import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils} import org.apache.spark.SPARK_VERSION -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark312HoodieParquetFileFormat} +import org.apache.spark.sql.hudi.SparkAdapter +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession} /** @@ -55,14 +54,7 @@ class Spark3_1Adapter extends BaseSpark3Adapter { } } - override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = { - if (SPARK_VERSION.startsWith("3.1")) { - val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat" - val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader) - val ctor = clazz.getConstructors.head - Some(ctor.newInstance().asInstanceOf[ParquetFileFormat]) - } else { - None - } + override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { + Some(new Spark312HoodieParquetFileFormat(appendPartitionValues)) } } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala index 83b3162bbc32..769373866ff3 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala @@ -17,279 +17,312 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.net.URI -import java.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.HoodieSparkUtils -import org.apache.hudi.common.util.InternalSchemaCache +import org.apache.hudi.common.util.StringUtils.isNullOrEmpty +import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils} import org.apache.hudi.common.util.collection.Pair import org.apache.hudi.internal.schema.InternalSchema -import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.hudi.internal.schema.action.InternalSchemaMerger +import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader} - import org.apache.spark.TaskContext import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.AvroDeserializer import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet} import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} -import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} import org.apache.spark.util.SerializableConfiguration -class Spark312HoodieParquetFileFormat extends ParquetFileFormat { - - // reference ParquetFileFormat from spark project - override def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) { - // fallback to origin parquet File read - super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) - } else { - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) - hadoopConf.set( - ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - requiredSchema.json) - hadoopConf.set( - ParquetWriteSupport.SPARK_ROW_SCHEMA, - requiredSchema.json) - hadoopConf.set( - SQLConf.SESSION_LOCAL_TIMEZONE.key, - sparkSession.sessionState.conf.sessionLocalTimeZone) - hadoopConf.setBoolean( - SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, - sparkSession.sessionState.conf.nestedSchemaPruningEnabled) - hadoopConf.setBoolean( - SQLConf.CASE_SENSITIVE.key, - sparkSession.sessionState.conf.caseSensitiveAnalysis) - - ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) - - // Sets flags for `ParquetToSparkSchemaConverter` - hadoopConf.setBoolean( - SQLConf.PARQUET_BINARY_AS_STRING.key, - sparkSession.sessionState.conf.isParquetBinaryAsString) - hadoopConf.setBoolean( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) - // for dataSource v1, we have no method to do project for spark physical plan. - // it's safe to do cols project here. - val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) - val querySchemaOption = SerDeHelper.fromJson(internalSchemaString) - if (querySchemaOption.isPresent && !requiredSchema.isEmpty) { - val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get()) - hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema)) +import java.net.URI + + +/** + * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior + * that's not possible to customize in any other way + * + * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.1.2 w/ w/ the following changes applied to it: + *
    + *
  1. Avoiding appending partition values to the rows read from the data file
  2. + *
  3. Schema on-read
  4. + *
+ */ +class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { + + override def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set( + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + requiredSchema.json) + hadoopConf.set( + ParquetWriteSupport.SPARK_ROW_SCHEMA, + requiredSchema.json) + hadoopConf.set( + SQLConf.SESSION_LOCAL_TIMEZONE.key, + sparkSession.sessionState.conf.sessionLocalTimeZone) + hadoopConf.setBoolean( + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, + sparkSession.sessionState.conf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean( + SQLConf.CASE_SENSITIVE.key, + sparkSession.sessionState.conf.caseSensitiveAnalysis) + + ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) + + // Sets flags for `ParquetToSparkSchemaConverter` + hadoopConf.setBoolean( + SQLConf.PARQUET_BINARY_AS_STRING.key, + sparkSession.sessionState.conf.isParquetBinaryAsString) + hadoopConf.setBoolean( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + + val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) + // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself, + // therefore it's safe to do schema projection here + if (!isNullOrEmpty(internalSchemaStr)) { + val prunedInternalSchemaStr = + pruneInternalSchema(internalSchemaStr, requiredSchema) + hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) + } + + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + // TODO: if you move this into the closure it reverts to the default values. + // If true, enable using the custom RecordReader for parquet. This only works for + // a subset of the types (no complex types). + val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) + val sqlConf = sparkSession.sessionState.conf + val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled + val enableVectorizedReader: Boolean = + sqlConf.parquetVectorizedReaderEnabled && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled + val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion + val capacity = sqlConf.parquetVectorizedReaderBatchSize + val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown + // Whole stage codegen (PhysicalRDD) is able to deal with batches directly + val returningBatch = supportBatch(sparkSession, resultSchema) + val pushDownDate = sqlConf.parquetFilterPushDownDate + val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp + val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal + val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + val isCaseSensitive = sqlConf.caseSensitiveAnalysis + + (file: PartitionedFile) => { + assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) + + val filePath = new Path(new URI(file.filePath)) + val split = + new org.apache.parquet.hadoop.ParquetInputSplit( + filePath, + file.start, + file.start + file.length, + file.length, + Array.empty, + null) + + val sharedConf = broadcastedHadoopConf.value.value + + // Fetch internal schema + val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) + // Internal schema has to be pruned at this point + val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) + + val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent + + val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) + val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; + val fileSchema = if (shouldUseInternalSchema) { + val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) + InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) + } else { + null } - val broadcastedHadoopConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - - // TODO: if you move this into the closure it reverts to the default values. - // If true, enable using the custom RecordReader for parquet. This only works for - // a subset of the types (no complex types). - val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) - val sqlConf = sparkSession.sessionState.conf - val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled - val enableVectorizedReader: Boolean = - sqlConf.parquetVectorizedReaderEnabled && - resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) - val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled - val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion - val capacity = sqlConf.parquetVectorizedReaderBatchSize - val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - val returningBatch = supportBatch(sparkSession, resultSchema) - val pushDownDate = sqlConf.parquetFilterPushDownDate - val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp - val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith - val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - val isCaseSensitive = sqlConf.caseSensitiveAnalysis - - (file: PartitionedFile) => { - assert(file.partitionValues.numFields == partitionSchema.size) - val filePath = new Path(new URI(file.filePath)) - val split = - new org.apache.parquet.hadoop.ParquetInputSplit( - filePath, - file.start, - file.start + file.length, - file.length, - Array.empty, - null) - val sharedConf = broadcastedHadoopConf.value.value - // do deal with internalSchema - val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) - // querySchema must be a pruned schema. - val querySchemaOption = SerDeHelper.fromJson(internalSchemaString) - val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true - val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) - val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; - val fileSchema = if (internalSchemaChangeEnabled) { - val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) - InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) + + lazy val footerFileMetaData = + ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) { + createParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) } else { - // this should not happened, searchSchemaAndCache will deal with correctly. - null + createParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive) } + filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null))) + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) + } else { + None + } - lazy val footerFileMetaData = - ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData - val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( - footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) - // Try to push down filters when filter push-down is enabled. - val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) { - Spark312HoodieParquetFileFormat.createParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringStartWith, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseMode) - } else { - Spark312HoodieParquetFileFormat.createParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringStartWith, - pushDownInFilterThreshold, - isCaseSensitive) - } - filters.map(Spark312HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get())) - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter(_)) - .reduceOption(FilterApi.and) + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) } else { None } - // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' - // *only* if the file was created by something other than "parquet-mr", so check the actual - // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. - // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. - def isCreatedByParquetMr: Boolean = - footerFileMetaData.getCreatedBy().startsWith("parquet-mr") - - val convertTz = - if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + val int96RebaseMode = DataSourceUtils.int96RebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + + // Clone new conf + val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) + var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap() + if (shouldUseInternalSchema) { + val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() + val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) + typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json) + } + val hadoopAttemptContext = + new TaskAttemptContextImpl(hadoopAttemptConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = + if (shouldUseInternalSchema) { + new Spark312HoodieVectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + int96RebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity, + typeChangeInfos) } else { - None + new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + int96RebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) } - val int96RebaseMode = DataSourceUtils.int96RebaseMode( - footerFileMetaData.getKeyValueMetaData.get, - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ)) - - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - // use new conf - val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value) - // - // reset request schema - var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap() - if (internalSchemaChangeEnabled) { - val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() - val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) - typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) - hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json) - } - val hadoopAttemptContext = - new TaskAttemptContextImpl(hadoopAttempConf, attemptId) - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) - } - val taskContext = Option(TaskContext.get()) - if (enableVectorizedReader) { - val vectorizedReader = new Spark312HoodieVectorizedParquetRecordReader( - convertTz.orNull, - datetimeRebaseMode.toString, - int96RebaseMode.toString, - enableOffHeapColumnVector && taskContext.isDefined, - capacity, typeChangeInfos) - val iter = new RecordReaderIterator(vectorizedReader) - // SPARK-23457 Register a task completion listener before `initialization`. - taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - vectorizedReader.initialize(split, hadoopAttemptContext) + val iter = new RecordReaderIterator(vectorizedReader) + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + vectorizedReader.initialize(split, hadoopAttemptContext) + + // NOTE: We're making appending of the partitioned values to the rows read from the + // data file configurable + if (shouldAppendPartitionValues) { logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() - } + } else { + vectorizedReader.initBatch(StructType(Nil), InternalRow.empty) + } + + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } else { + logDebug(s"Falling back to parquet-mr") + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseMode, + int96RebaseMode) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) } else { - logDebug(s"Falling back to parquet-mr") - // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( - convertTz, - enableVectorizedReader = false, - datetimeRebaseMode, - int96RebaseMode) - val reader = if (pushed.isDefined && enableRecordFilter) { - val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[InternalRow](readSupport, parquetFilter) - } else { - new ParquetRecordReader[InternalRow](readSupport) - } - val iter = new RecordReaderIterator[InternalRow](reader) - // SPARK-23457 Register a task completion listener before `initialization`. - taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - reader.initialize(split, hadoopAttemptContext) - - val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val unsafeProjection = if (typeChangeInfos.isEmpty) { - GenerateUnsafeProjection.generate(fullSchema, fullSchema) - } else { - // find type changed. - val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) => - if (typeChangeInfos.containsKey(i)) { - StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata) - } else f - }).toAttributes ++ partitionSchema.toAttributes - val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) => - if (typeChangeInfos.containsKey(i)) { - Cast(attr, typeChangeInfos.get(i).getLeft) - } else attr - } - GenerateUnsafeProjection.generate(castSchema, newFullSchema) - } + new ParquetRecordReader[InternalRow](readSupport) + } + val iter = new RecordReaderIterator[InternalRow](reader) + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + reader.initialize(split, hadoopAttemptContext) - if (partitionSchema.length == 0) { - // There is no partition columns - iter.map(unsafeProjection) - } else { - val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = if (typeChangeInfos.isEmpty) { + GenerateUnsafeProjection.generate(fullSchema, fullSchema) + } else { + // find type changed. + val newFullSchema = new StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) => + if (typeChangeInfos.containsKey(i)) { + StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, f.metadata) + } else f + }).toAttributes ++ partitionSchema.toAttributes + val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) => + if (typeChangeInfos.containsKey(i)) { + Cast(attr, typeChangeInfos.get(i).getLeft) + } else attr } + GenerateUnsafeProjection.generate(castSchema, newFullSchema) + } + + // NOTE: We're making appending of the partitioned values to the rows read from the + // data file configurable + if (!shouldAppendPartitionValues || partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) } } } @@ -300,6 +333,16 @@ object Spark312HoodieParquetFileFormat { val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters" + def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { + val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) + if (querySchemaOption.isPresent && requiredSchema.nonEmpty) { + val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get()) + SerDeHelper.toJson(prunedSchema) + } else { + internalSchemaStr + } + } + private def createParquetFilters(arg: Any*): ParquetFilters = { val clazz = Class.forName(PARQUET_FILTERS_CLASS_NAME, true, Thread.currentThread().getContextClassLoader) val ctor = clazz.getConstructors.head diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala index bad392b4f97a..15624c741130 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark32HoodieParquetFileFormat} import org.apache.spark.sql.parser.HoodieSpark3_2ExtendedSqlParser import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_2CatalystExpressionUtils, SparkSession} @@ -80,14 +80,7 @@ class Spark3_2Adapter extends BaseSpark3Adapter { } } - override def createHoodieParquetFileFormat(): Option[ParquetFileFormat] = { - if (SPARK_VERSION.startsWith("3.2")) { - val loadClassName = "org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat" - val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader) - val ctor = clazz.getConstructors.head - Some(ctor.newInstance().asInstanceOf[ParquetFileFormat]) - } else { - None - } + override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { + Some(new Spark32HoodieParquetFileFormat(appendPartitionValues)) } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index 28db4739656e..f2a0a21df830 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.net.URI - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.FileSplit @@ -27,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.InternalSchemaCache +import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger @@ -34,226 +33,266 @@ import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS -import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader} +import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} import org.apache.spark.TaskContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet} import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} import org.apache.spark.util.SerializableConfiguration -class Spark32HoodieParquetFileFormat extends ParquetFileFormat { - - // reference ParquetFileFormat from spark project - override def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) { - // fallback to origin parquet File read - super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) - } else { - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) - hadoopConf.set( - ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - requiredSchema.json) - hadoopConf.set( - ParquetWriteSupport.SPARK_ROW_SCHEMA, - requiredSchema.json) - hadoopConf.set( - SQLConf.SESSION_LOCAL_TIMEZONE.key, - sparkSession.sessionState.conf.sessionLocalTimeZone) - hadoopConf.setBoolean( - SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, - sparkSession.sessionState.conf.nestedSchemaPruningEnabled) - hadoopConf.setBoolean( - SQLConf.CASE_SENSITIVE.key, - sparkSession.sessionState.conf.caseSensitiveAnalysis) - - ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) - - // Sets flags for `ParquetToSparkSchemaConverter` - hadoopConf.setBoolean( - SQLConf.PARQUET_BINARY_AS_STRING.key, - sparkSession.sessionState.conf.isParquetBinaryAsString) - hadoopConf.setBoolean( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) - // for dataSource v1, we have no method to do project for spark physical plan. - // it's safe to do cols project here. - val internalSchemaString = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) - val querySchemaOption = SerDeHelper.fromJson(internalSchemaString) - if (querySchemaOption.isPresent && !requiredSchema.isEmpty) { - val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get()) - hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(prunedSchema)) +import java.net.URI + +/** + * This class is an extension of [[ParquetFileFormat]] overriding Spark-specific behavior + * that's not possible to customize in any other way + * + * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.2.1 w/ w/ the following changes applied to it: + *
    + *
  1. Avoiding appending partition values to the rows read from the data file
  2. + *
  3. Schema on-read
  4. + *
+ */ +class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { + + override def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set( + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + requiredSchema.json) + hadoopConf.set( + ParquetWriteSupport.SPARK_ROW_SCHEMA, + requiredSchema.json) + hadoopConf.set( + SQLConf.SESSION_LOCAL_TIMEZONE.key, + sparkSession.sessionState.conf.sessionLocalTimeZone) + hadoopConf.setBoolean( + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, + sparkSession.sessionState.conf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean( + SQLConf.CASE_SENSITIVE.key, + sparkSession.sessionState.conf.caseSensitiveAnalysis) + + ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) + + // Sets flags for `ParquetToSparkSchemaConverter` + hadoopConf.setBoolean( + SQLConf.PARQUET_BINARY_AS_STRING.key, + sparkSession.sessionState.conf.isParquetBinaryAsString) + hadoopConf.setBoolean( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + + val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) + // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself, + // therefore it's safe to do schema projection here + if (!isNullOrEmpty(internalSchemaStr)) { + val prunedInternalSchemaStr = + pruneInternalSchema(internalSchemaStr, requiredSchema) + hadoopConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) + } + + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + // TODO: if you move this into the closure it reverts to the default values. + // If true, enable using the custom RecordReader for parquet. This only works for + // a subset of the types (no complex types). + val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) + val sqlConf = sparkSession.sessionState.conf + val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled + val enableVectorizedReader: Boolean = + sqlConf.parquetVectorizedReaderEnabled && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled + val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion + val capacity = sqlConf.parquetVectorizedReaderBatchSize + val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown + // Whole stage codegen (PhysicalRDD) is able to deal with batches directly + val returningBatch = supportBatch(sparkSession, resultSchema) + val pushDownDate = sqlConf.parquetFilterPushDownDate + val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp + val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal + val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) + val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead + val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead + + (file: PartitionedFile) => { + assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) + + val filePath = new Path(new URI(file.filePath)) + val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) + + val sharedConf = broadcastedHadoopConf.value.value + + // Fetch internal schema + val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) + // Internal schema has to be pruned at this point + val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) + + val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent + + val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) + val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; + val fileSchema = if (shouldUseInternalSchema) { + val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) + InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) + } else { + null } - val broadcastedHadoopConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - - // TODO: if you move this into the closure it reverts to the default values. - // If true, enable using the custom RecordReader for parquet. This only works for - // a subset of the types (no complex types). - val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) - val sqlConf = sparkSession.sessionState.conf - val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled - val enableVectorizedReader: Boolean = - sqlConf.parquetVectorizedReaderEnabled && - resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) - val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled - val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion - val capacity = sqlConf.parquetVectorizedReaderBatchSize - val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - val returningBatch = supportBatch(sparkSession, resultSchema) - val pushDownDate = sqlConf.parquetFilterPushDownDate - val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp - val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith - val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - val isCaseSensitive = sqlConf.caseSensitiveAnalysis - val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead - val int96RebaseModeInread = parquetOptions.int96RebaseModeInRead - - (file: PartitionedFile) => { - assert(file.partitionValues.numFields == partitionSchema.size) - val filePath = new Path(new URI(file.filePath)) - val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val sharedConf = broadcastedHadoopConf.value.value - // do deal with internalSchema - val internalSchemaString = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) - // querySchema must be a pruned schema. - val querySchemaOption = SerDeHelper.fromJson(internalSchemaString) - val internalSchemaChangeEnabled = if (internalSchemaString.isEmpty || !querySchemaOption.isPresent) false else true - val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) - val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; - val fileSchema = if (internalSchemaChangeEnabled) { - val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) - InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) - } else { - // this should not happened, searchSchemaAndCache will deal with correctly. - null - } - lazy val footerFileMetaData = - ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) - // Try to push down filters when filter push-down is enabled. - val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringStartWith, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters.map(Spark32HoodieParquetFileFormat.rebuildFilterFromParquet(_, fileSchema, querySchemaOption.get())) - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter(_)) - .reduceOption(FilterApi.and) + lazy val footerFileMetaData = + ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null))) + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) } else { None } - // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' - // *only* if the file was created by something other than "parquet-mr", so check the actual - // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. - // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. - def isCreatedByParquetMr: Boolean = - footerFileMetaData.getCreatedBy().startsWith("parquet-mr") - - val convertTz = - if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + int96RebaseModeInRead) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + + // Clone new conf + val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) + var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap() + if (shouldUseInternalSchema) { + val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() + val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) + typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json) + } + val hadoopAttemptContext = + new TaskAttemptContextImpl(hadoopAttemptConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = + if (shouldUseInternalSchema) { + new Spark32HoodieVectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity, + typeChangeInfos) } else { - None + new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) } - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInread) - - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - // use new conf - val hadoopAttempConf = new Configuration(broadcastedHadoopConf.value.value) + // SPARK-37089: We cannot register a task completion listener to close this iterator here + // because downstream exec nodes have already registered their listeners. Since listeners + // are executed in reverse order of registration, a listener registered here would close the + // iterator while downstream exec nodes are still running. When off-heap column vectors are + // enabled, this can cause a use-after-free bug leading to a segfault. // - // reset request schema - var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap() - if (internalSchemaChangeEnabled) { - val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() - val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) - typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) - hadoopAttempConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json) - } - val hadoopAttemptContext = - new TaskAttemptContextImpl(hadoopAttempConf, attemptId) + // Instead, we use FileScanRDD's task completion listener to close this iterator. + val iter = new RecordReaderIterator(vectorizedReader) + try { + vectorizedReader.initialize(split, hadoopAttemptContext) - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) - } - val taskContext = Option(TaskContext.get()) - if (enableVectorizedReader) { - val vectorizedReader = new Spark32HoodieVectorizedParquetRecordReader( - convertTz.orNull, - datetimeRebaseSpec.mode.toString, - datetimeRebaseSpec.timeZone, - int96RebaseSpec.mode.toString, - int96RebaseSpec.timeZone, - enableOffHeapColumnVector && taskContext.isDefined, - capacity, typeChangeInfos) - val iter = new RecordReaderIterator(vectorizedReader) - // SPARK-23457 Register a task completion listener before `initialization`. - // taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) - try { - vectorizedReader.initialize(split, hadoopAttemptContext) + // NOTE: We're making appending of the partitioned values to the rows read from the + // data file configurable + if (shouldAppendPartitionValues) { logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() - } + } else { + vectorizedReader.initBatch(StructType(Nil), InternalRow.empty) + } - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + if (returningBatch) { + vectorizedReader.enableReturningBatches() } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } else { + logDebug(s"Falling back to parquet-mr") + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) } else { - logDebug(s"Falling back to parquet-mr") - // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( - convertTz, - enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) - val reader = if (pushed.isDefined && enableRecordFilter) { - val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[InternalRow](readSupport, parquetFilter) - } else { - new ParquetRecordReader[InternalRow](readSupport) - } - val iter = new RecordReaderIterator[InternalRow](reader) - // SPARK-23457 Register a task completion listener before `initialization`. - taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + new ParquetRecordReader[InternalRow](readSupport) + } + val iter = new RecordReaderIterator[InternalRow](reader) + try { reader.initialize(split, hadoopAttemptContext) val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes @@ -274,13 +313,21 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat { GenerateUnsafeProjection.generate(castSchema, newFullSchema) } - if (partitionSchema.length == 0) { + // NOTE: We're making appending of the partitioned values to the rows read from the + // data file configurable + if (!shouldAppendPartitionValues || partitionSchema.length == 0) { // There is no partition columns iter.map(unsafeProjection) } else { val joinedRow = new JoinedRow() iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) } + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e } } } @@ -289,6 +336,16 @@ class Spark32HoodieParquetFileFormat extends ParquetFileFormat { object Spark32HoodieParquetFileFormat { + def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { + val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) + if (querySchemaOption.isPresent && requiredSchema.nonEmpty) { + val prunedSchema = SparkInternalSchemaConverter.convertAndPruneStructTypeToInternalSchema(requiredSchema, querySchemaOption.get()) + SerDeHelper.toJson(prunedSchema) + } else { + internalSchemaStr + } + } + private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = { if (fileSchema == null || querySchema == null) { oldFilter