From c71805c763f244e9e59832b9d67f48d74f1e9c64 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Mon, 25 Apr 2022 20:36:29 -0700 Subject: [PATCH] [HUDI-3974] Fix schema projection to skip non-existent preCombine field --- .../org/apache/hudi/HoodieSparkUtils.scala | 21 ++++-- .../schema/utils/InternalSchemaUtils.java | 64 +++++++++++++++++-- .../org/apache/hudi/HoodieBaseRelation.scala | 10 +-- .../apache/hudi/TestHoodieSparkUtils.scala | 2 +- .../TestParquetColumnProjection.scala | 3 +- 5 files changed, 82 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 7a8f8a1580d97..0831631924587 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -32,6 +32,7 @@ import org.apache.hudi.internal.schema.utils.InternalSchemaUtils import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} +import org.apache.log4j.LogManager import org.apache.spark.SPARK_VERSION import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame @@ -44,6 +45,7 @@ import java.util.Properties import scala.collection.JavaConverters._ object HoodieSparkUtils extends SparkAdapterSupport { + private val LOG = LogManager.getLogger(HoodieSparkUtils.getClass) def isSpark2: Boolean = SPARK_VERSION.startsWith("2.") @@ -318,13 +320,23 @@ object HoodieSparkUtils extends SparkAdapterSupport { AttributeReference(columnName, field.get.dataType, field.get.nullable)() } - def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String], internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): (Schema, StructType, InternalSchema) = { - if (internalSchema.isEmptySchema || requiredColumns.isEmpty) { + def getRequiredSchema(tableAvroSchema: Schema, queryAndHudiRequiredFields: (Array[String], Array[String]), + internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): (Schema, StructType, InternalSchema) = { + val queryRequiredFields = queryAndHudiRequiredFields._1 + val hudiRequiredFields = queryAndHudiRequiredFields._2 + if (internalSchema.isEmptySchema || (queryRequiredFields.isEmpty && hudiRequiredFields.isEmpty)) { // First get the required avro-schema, then convert the avro-schema to spark schema. val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap // Here have to create a new Schema.Field object // to prevent throwing exceptions like "org.apache.avro.AvroRuntimeException: Field already used". - val requiredFields = requiredColumns.map(c => name2Fields(c)) + val requiredFields = (queryRequiredFields ++ hudiRequiredFields.filter(c => { + val containsColumn = name2Fields.contains(c) + if (!containsColumn) { + LOG.warn(String.format("Cannot find Hudi required field: field %s does not exist in Hudi table", c)) + } + containsColumn + })) + .map(c => name2Fields(c)) .map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc, tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava) @@ -332,7 +344,8 @@ object HoodieSparkUtils extends SparkAdapterSupport { (requiredAvroSchema, requiredStructSchema, internalSchema) } else { // now we support nested project - val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava) + val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema( + internalSchema, queryRequiredFields.toList.asJava, hudiRequiredFields.toList.asJava) val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName) val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema) (requiredAvroSchema, requiredStructSchema, prunedInternalSchema) diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java index a784b409b8f2f..291aad2bd4011 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java @@ -25,14 +25,17 @@ import org.apache.hudi.internal.schema.Types; import org.apache.hudi.internal.schema.Types.Field; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeMap; -import java.util.SortedMap; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.stream.Collectors; /** @@ -40,6 +43,7 @@ * eg: column prune, filter rebuild for query engine... */ public class InternalSchemaUtils { + private static final Logger LOG = LogManager.getLogger(InternalSchemaUtils.class); private InternalSchemaUtils() { } @@ -54,29 +58,75 @@ private InternalSchemaUtils() { */ public static InternalSchema pruneInternalSchema(InternalSchema schema, List names) { // do check - List prunedIds = names.stream().map(name -> { + List prunedIds = names.stream() + .filter(name -> { + int id = schema.findIdByName(name); + if (id < 0) { + LOG.warn(String.format("cannot prune col: %s does not exist in hudi table", name)); + return false; + } + return true; + }) + .map(schema::findIdByName).collect(Collectors.toList()); + // find top parent field ID. eg: a.b.c, f.g.h, only collect id of a and f ignore all child field. + List topParentFieldIds = new ArrayList<>(); + names.stream().forEach(f -> { + int id = schema.findIdByName(f.split("\\.")[0]); + if (!topParentFieldIds.contains(id)) { + topParentFieldIds.add(id); + } + }); + return pruneInternalSchemaByID(schema, prunedIds, topParentFieldIds); + } + + /** + * Create project internalSchema, based on the project names which produced by query engine and Hudi fields. + * support nested project. + * + * @param schema a internal schema. + * @param queryFields project names produced by query engine. + * @param hudiFields project names required by Hudi merging. + * @return a project internalSchema. + */ + public static InternalSchema pruneInternalSchema(InternalSchema schema, List queryFields, List hudiFields) { + // do check + List allPrunedIds = queryFields.stream().map(name -> { int id = schema.findIdByName(name); if (id == -1) { - throw new IllegalArgumentException(String.format("cannot prune col: %s which not exisit in hudi table", name)); + throw new IllegalArgumentException(String.format("Cannot prune col from query: %s does not exist in hudi table", name)); } return id; }).collect(Collectors.toList()); + List allFields = new ArrayList<>(queryFields); + // Filter non-existent Hudi fields + List filteredHudiFields = hudiFields.stream() + .filter(name -> { + int id = schema.findIdByName(name); + if (id < 0) { + LOG.warn(String.format("Cannot prune col from Hudi: %s does not exist in hudi table", name)); + return false; + } + return true; + }).collect(Collectors.toList()); + allFields.addAll(filteredHudiFields); + allPrunedIds.addAll(filteredHudiFields.stream() + .map(schema::findIdByName).collect(Collectors.toList())); // find top parent field ID. eg: a.b.c, f.g.h, only collect id of a and f ignore all child field. List topParentFieldIds = new ArrayList<>(); - names.stream().forEach(f -> { + allFields.forEach(f -> { int id = schema.findIdByName(f.split("\\.")[0]); if (!topParentFieldIds.contains(id)) { topParentFieldIds.add(id); } }); - return pruneInternalSchemaByID(schema, prunedIds, topParentFieldIds); + return pruneInternalSchemaByID(schema, allPrunedIds, topParentFieldIds); } /** * Create project internalSchema. * support nested project. * - * @param schema a internal schema. + * @param schema a internal schema. * @param fieldIds project col field_ids. * @return a project internalSchema. */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 4b7177f4d6326..89b0f3119548e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -248,10 +248,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM // PROJECTION - val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns) + val queryAndHudiRequiredFields: (Array[String], Array[String]) = appendMandatoryRootFields(requiredColumns) val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = - HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema) + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, queryAndHudiRequiredFields, internalSchema) val filterExpressions = convertToExpressions(filters) val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) @@ -364,12 +364,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, !SubqueryExpression.hasSubquery(condition) } - protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = { + protected final def appendMandatoryRootFields(queryRequestedColumns: Array[String]): (Array[String], Array[String]) = { // For a nested field in mandatory columns, we should first get the root-level field, and then // check for any missing column, as the requestedColumns should only contain root-level fields // We should only append root-level field as well - val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField)) - requestedColumns ++ missing + (queryRequestedColumns, + mandatoryRootFields.filter(rootField => !queryRequestedColumns.contains(rootField)).toArray) } protected def getTableState: HoodieTableState = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index e71973f94a164..972b4ac168f39 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -222,7 +222,7 @@ class TestHoodieSparkUtils { val tableAvroSchema = new Schema.Parser().parse(avroSchemaString) val (requiredAvroSchema, requiredStructSchema, _) = - HoodieSparkUtils.getRequiredSchema(tableAvroSchema, Array("ts")) + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, (Array("ts"), Array())) assertEquals("timestamp-millis", requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index 945d26be3f464..a3fe48532ec09 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -333,7 +333,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with } val readColumns = targetColumns ++ relation.mandatoryFields - val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns) + val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema( + tableState.schema, (targetColumns, relation.mandatoryFields.toArray)) val row: InternalRow = rows.take(1).head