Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hudi.internal.schema.utils.InternalSchemaUtils
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
import org.apache.log4j.LogManager
import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
Expand All @@ -44,6 +45,7 @@ import java.util.Properties
import scala.collection.JavaConverters._

object HoodieSparkUtils extends SparkAdapterSupport {
private val LOG = LogManager.getLogger(HoodieSparkUtils.getClass)

def isSpark2: Boolean = SPARK_VERSION.startsWith("2.")

Expand Down Expand Up @@ -318,21 +320,32 @@ object HoodieSparkUtils extends SparkAdapterSupport {
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
}

def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String], internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): (Schema, StructType, InternalSchema) = {
if (internalSchema.isEmptySchema || requiredColumns.isEmpty) {
def getRequiredSchema(tableAvroSchema: Schema, queryAndHudiRequiredFields: (Array[String], Array[String]),
internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): (Schema, StructType, InternalSchema) = {
val queryRequiredFields = queryAndHudiRequiredFields._1
val hudiRequiredFields = queryAndHudiRequiredFields._2
if (internalSchema.isEmptySchema || (queryRequiredFields.isEmpty && hudiRequiredFields.isEmpty)) {
// First get the required avro-schema, then convert the avro-schema to spark schema.
val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap
// Here have to create a new Schema.Field object
// to prevent throwing exceptions like "org.apache.avro.AvroRuntimeException: Field already used".
val requiredFields = requiredColumns.map(c => name2Fields(c))
val requiredFields = (queryRequiredFields ++ hudiRequiredFields.filter(c => {
val containsColumn = name2Fields.contains(c)
if (!containsColumn) {
LOG.warn(String.format("Cannot find Hudi required field: field %s does not exist in Hudi table", c))
}
containsColumn
}))
.map(c => name2Fields(c))
.map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList
val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc,
tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema, internalSchema)
} else {
// now we support nested project
val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava)
val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(
internalSchema, queryRequiredFields.toList.asJava, hudiRequiredFields.toList.asJava)
val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,25 @@
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.internal.schema.Types.Field;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.SortedMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;

/**
* Util methods to help us do some operations on InternalSchema.
* eg: column prune, filter rebuild for query engine...
*/
public class InternalSchemaUtils {
private static final Logger LOG = LogManager.getLogger(InternalSchemaUtils.class);

private InternalSchemaUtils() {
}
Expand All @@ -54,29 +58,75 @@ private InternalSchemaUtils() {
*/
public static InternalSchema pruneInternalSchema(InternalSchema schema, List<String> names) {
// do check
List<Integer> prunedIds = names.stream().map(name -> {
List<Integer> prunedIds = names.stream()
.filter(name -> {
int id = schema.findIdByName(name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help me understand something. I understand if non existant preCombine is part of the names, we ignore it.
But if someone does a query "select a,b,c from tbl", where b does not even exist in the table, we have to throw exception. Can you confirm that is not affected by this fix here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the filtering should not happen after a second thought. I'm going to rethink how to make the fix.

if (id < 0) {
LOG.warn(String.format("cannot prune col: %s does not exist in hudi table", name));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prior to this patch, we were throwing exception and now we are not? is this change intended?

return false;
}
return true;
})
.map(schema::findIdByName).collect(Collectors.toList());
// find top parent field ID. eg: a.b.c, f.g.h, only collect id of a and f ignore all child field.
List<Integer> topParentFieldIds = new ArrayList<>();
names.stream().forEach(f -> {
int id = schema.findIdByName(f.split("\\.")[0]);
if (!topParentFieldIds.contains(id)) {
topParentFieldIds.add(id);
}
});
return pruneInternalSchemaByID(schema, prunedIds, topParentFieldIds);
}

/**
* Create project internalSchema, based on the project names which produced by query engine and Hudi fields.
* support nested project.
*
* @param schema a internal schema.
* @param queryFields project names produced by query engine.
* @param hudiFields project names required by Hudi merging.
* @return a project internalSchema.
*/
public static InternalSchema pruneInternalSchema(InternalSchema schema, List<String> queryFields, List<String> hudiFields) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the addition of this new method, is method at L 59 called anywhere? I expect all callers to use this instead of that?

// do check
List<Integer> allPrunedIds = queryFields.stream().map(name -> {
int id = schema.findIdByName(name);
if (id == -1) {
throw new IllegalArgumentException(String.format("cannot prune col: %s which not exisit in hudi table", name));
throw new IllegalArgumentException(String.format("Cannot prune col from query: %s does not exist in hudi table", name));
}
return id;
}).collect(Collectors.toList());
List<String> allFields = new ArrayList<>(queryFields);
// Filter non-existent Hudi fields
List<String> filteredHudiFields = hudiFields.stream()
.filter(name -> {
int id = schema.findIdByName(name);
if (id < 0) {
LOG.warn(String.format("Cannot prune col from Hudi: %s does not exist in hudi table", name));
return false;
}
return true;
}).collect(Collectors.toList());
allFields.addAll(filteredHudiFields);
allPrunedIds.addAll(filteredHudiFields.stream()
.map(schema::findIdByName).collect(Collectors.toList()));
// find top parent field ID. eg: a.b.c, f.g.h, only collect id of a and f ignore all child field.
List<Integer> topParentFieldIds = new ArrayList<>();
names.stream().forEach(f -> {
allFields.forEach(f -> {
int id = schema.findIdByName(f.split("\\.")[0]);
if (!topParentFieldIds.contains(id)) {
topParentFieldIds.add(id);
}
});
return pruneInternalSchemaByID(schema, prunedIds, topParentFieldIds);
return pruneInternalSchemaByID(schema, allPrunedIds, topParentFieldIds);
}

/**
* Create project internalSchema.
* support nested project.
*
* @param schema a internal schema.
* @param schema a internal schema.
* @param fieldIds project col field_ids.
* @return a project internalSchema.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
//
// (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
// PROJECTION
val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns)
val queryAndHudiRequiredFields: (Array[String], Array[String]) = appendMandatoryRootFields(requiredColumns)

val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema)
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, queryAndHudiRequiredFields, internalSchema)

val filterExpressions = convertToExpressions(filters)
val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate)
Expand Down Expand Up @@ -364,12 +364,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
!SubqueryExpression.hasSubquery(condition)
}

protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = {
protected final def appendMandatoryRootFields(queryRequestedColumns: Array[String]): (Array[String], Array[String]) = {
// For a nested field in mandatory columns, we should first get the root-level field, and then
// check for any missing column, as the requestedColumns should only contain root-level fields
// We should only append root-level field as well
val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField))
requestedColumns ++ missing
(queryRequestedColumns,
mandatoryRootFields.filter(rootField => !queryRequestedColumns.contains(rootField)).toArray)
}

protected def getTableState: HoodieTableState = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class TestHoodieSparkUtils {
val tableAvroSchema = new Schema.Parser().parse(avroSchemaString)

val (requiredAvroSchema, requiredStructSchema, _) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, Array("ts"))
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, (Array("ts"), Array()))

assertEquals("timestamp-millis",
requiredAvroSchema.getField("ts").schema().getTypes.get(1).getLogicalType.getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
}

val readColumns = targetColumns ++ relation.mandatoryFields
val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(
tableState.schema, (targetColumns, relation.mandatoryFields.toArray))

val row: InternalRow = rows.take(1).head

Expand Down