Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, UnsafeProjection}
import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, MapType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType}

object HoodieParquetFileFormatHelper {

Expand Down Expand Up @@ -104,18 +104,24 @@ object HoodieParquetFileFormatHelper {
requiredSchema: StructType,
partitionSchema: StructType,
schemaUtils: HoodieSchemaUtils): UnsafeProjection = {
val floatToDoubleCache = scala.collection.mutable.HashMap.empty[(DataType, DataType), Boolean]
val addedCastCache = scala.collection.mutable.HashMap.empty[(DataType, DataType), Boolean]

def hasFloatToDoubleConversion(src: DataType, dst: DataType): Boolean = {
floatToDoubleCache.getOrElseUpdate((src, dst), {
def hasUnsupportedConversion(src: DataType, dst: DataType): Boolean = {
addedCastCache.getOrElseUpdate((src, dst), {
(src, dst) match {
case (FloatType, DoubleType) => true
case (IntegerType, DecimalType()) => true
case (LongType, DecimalType()) => true
case (FloatType, DecimalType()) => true
case (DoubleType, DecimalType()) => true
case (StringType, DecimalType()) => true
case (StringType, DateType) => true
case (StructType(srcFields), StructType(dstFields)) =>
srcFields.zip(dstFields).exists { case (sf, df) => hasFloatToDoubleConversion(sf.dataType, df.dataType) }
srcFields.zip(dstFields).exists { case (sf, df) => hasUnsupportedConversion(sf.dataType, df.dataType) }
case (ArrayType(sElem, _), ArrayType(dElem, _)) =>
hasFloatToDoubleConversion(sElem, dElem)
hasUnsupportedConversion(sElem, dElem)
case (MapType(sKey, sVal, _), MapType(dKey, dVal, _)) =>
hasFloatToDoubleConversion(sKey, dKey) || hasFloatToDoubleConversion(sVal, dVal)
hasUnsupportedConversion(sKey, dKey) || hasUnsupportedConversion(sVal, dVal)
case _ => false
}
})
Expand All @@ -127,7 +133,14 @@ object HoodieParquetFileFormatHelper {
case (FloatType, DoubleType) =>
val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else None)
Cast(toStr, dstType, if (needTimeZone) timeZoneId else None)
case (s: StructType, d: StructType) if hasFloatToDoubleConversion(s, d) =>
case (IntegerType | LongType | FloatType | DoubleType, dec: DecimalType) =>
val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else None)
Cast(toStr, dec, if (needTimeZone) timeZoneId else None)
case (StringType, dec: DecimalType) =>
Cast(expr, dec, if (needTimeZone) timeZoneId else None)
case (StringType, DateType) =>
Cast(expr, DateType, if (needTimeZone) timeZoneId else None)
case (s: StructType, d: StructType) if hasUnsupportedConversion(s, d) =>
val structFields = s.fields.zip(d.fields).zipWithIndex.map {
case ((srcField, dstField), i) =>
val child = GetStructField(expr, i, Some(dstField.name))
Expand All @@ -136,13 +149,13 @@ object HoodieParquetFileFormatHelper {
CreateNamedStruct(d.fields.zip(structFields).flatMap {
case (f, c) => Seq(Literal(f.name), c)
})
case (ArrayType(sElementType, containsNull), ArrayType(dElementType, _)) if hasFloatToDoubleConversion(sElementType, dElementType) =>
case (ArrayType(sElementType, containsNull), ArrayType(dElementType, _)) if hasUnsupportedConversion(sElementType, dElementType) =>
val lambdaVar = NamedLambdaVariable("element", sElementType, containsNull)
val body = recursivelyCastExpressions(lambdaVar, sElementType, dElementType)
val func = LambdaFunction(body, Seq(lambdaVar))
ArrayTransform(expr, func)
case (MapType(sKeyType, sValType, vnull), MapType(dKeyType, dValType, _))
if hasFloatToDoubleConversion(sKeyType, dKeyType) || hasFloatToDoubleConversion(sValType, dValType) =>
if hasUnsupportedConversion(sKeyType, dKeyType) || hasUnsupportedConversion(sValType, dValType) =>
val kv = NamedLambdaVariable("kv", new StructType()
.add("key", sKeyType, nullable = false)
.add("value", sValType, nullable = vnull), nullable = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestMerger;
Expand Down Expand Up @@ -64,6 +66,7 @@
*/
public class HoodieAvroReaderContext extends HoodieReaderContext<IndexedRecord> {
private final Map<StoragePath, HoodieAvroFileReader> reusableFileReaders;
private final boolean isMultiFormat;

/**
* Constructs an instance of the reader context that will read data into Avro records.
Expand Down Expand Up @@ -125,6 +128,7 @@ private HoodieAvroReaderContext(
boolean requiresPayloadRecords) {
super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new AvroRecordContext(tableConfig, payloadClassName, requiresPayloadRecords));
this.reusableFileReaders = reusableFileReaders;
this.isMultiFormat = tableConfig.isMultipleBaseFileFormatsEnabled();
}

@Override
Expand All @@ -136,15 +140,27 @@ public ClosableIterator<IndexedRecord> getFileRecordIterator(
Schema requiredSchema,
HoodieStorage storage) throws IOException {
HoodieAvroFileReader reader;
boolean isLogFile = FSUtils.isLogFile(filePath);
Schema fileOutputSchema;
Map<String, String> renamedColumns;
if (isLogFile) {
fileOutputSchema = requiredSchema;
renamedColumns = Collections.emptyMap();
} else {
Pair<Schema, Map<String, String>> requiredSchemaForFileAndRenamedColumns = getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(filePath);
fileOutputSchema = requiredSchemaForFileAndRenamedColumns.getLeft();
renamedColumns = requiredSchemaForFileAndRenamedColumns.getRight();
}
if (reusableFileReaders.containsKey(filePath)) {
reader = reusableFileReaders.get(filePath);
} else {
HoodieFileFormat fileFormat = isMultiFormat && !isLogFile ? HoodieFileFormat.fromFileExtension(filePath.getFileExtension()) : baseFileFormat;
reader = (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new HoodieConfig(),
filePath, baseFileFormat, Option.empty());
filePath, fileFormat, Option.empty());
}
if (keyFilterOpt.isEmpty()) {
return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
return reader.getIndexedRecordIterator(dataSchema, fileOutputSchema, renamedColumns);
}
if (reader.supportKeyPredicate()) {
List<String> keys = reader.extractKeys(keyFilterOpt);
Expand All @@ -158,7 +174,7 @@ public ClosableIterator<IndexedRecord> getFileRecordIterator(
return reader.getIndexedRecordsByKeyPrefixIterator(keyPrefixes, requiredSchema);
}
}
return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
return reader.getIndexedRecordIterator(dataSchema, fileOutputSchema, renamedColumns);
}

@Override
Expand Down
32 changes: 17 additions & 15 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1406,8 +1406,15 @@ public static boolean recordNeedsRewriteForExtendedAvroTypePromotion(Schema writ
if (writerSchema.equals(readerSchema)) {
return false;
}
if (readerSchema.getLogicalType() != null) {
// check if logical types are equal
return !readerSchema.getLogicalType().equals(writerSchema.getLogicalType());
}
switch (readerSchema.getType()) {
case RECORD:
if (readerSchema.getFields().size() > writerSchema.getFields().size()) {
return true;
}
for (Schema.Field field : readerSchema.getFields()) {
Schema.Field writerField = writerSchema.getField(field.name());
if (writerField == null || recordNeedsRewriteForExtendedAvroTypePromotion(writerField.schema(), field.schema())) {
Expand All @@ -1430,13 +1437,13 @@ public static boolean recordNeedsRewriteForExtendedAvroTypePromotion(Schema writ
case ENUM:
return needsRewriteToString(writerSchema, true);
case STRING:
case BYTES:
return needsRewriteToString(writerSchema, false);
case DOUBLE:
// To maintain precision, you need to convert Float -> String -> Double
return writerSchema.getType().equals(Schema.Type.FLOAT);
case FLOAT:
case LONG:
return !(writerSchema.getType().equals(Schema.Type.INT) || writerSchema.getType().equals(Schema.Type.LONG));
default:
return false;
return !writerSchema.getType().equals(readerSchema.getType());
}
}

Expand All @@ -1446,18 +1453,13 @@ public static boolean recordNeedsRewriteForExtendedAvroTypePromotion(Schema writ
* string so some intervention is needed
*/
private static boolean needsRewriteToString(Schema schema, boolean isEnum) {
switch (schema.getType()) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BYTES:
return true;
case ENUM:
return !isEnum;
default:
return false;
if (schema.getLogicalType() != null) {
return true;
}
if (schema.getType() == Schema.Type.ENUM) {
return !isEnum;
}
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.storage.StoragePath;

import org.apache.avro.Schema;

Expand Down Expand Up @@ -76,22 +81,24 @@ public class FileGroupReaderSchemaHandler<T> {

protected final TypedProperties properties;
private final DeleteContext deleteContext;
private final HoodieTableMetaClient metaClient;

public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
Schema tableSchema,
Schema requestedSchema,
Option<InternalSchema> internalSchemaOpt,
HoodieTableConfig hoodieTableConfig,
TypedProperties properties) {
TypedProperties properties,
HoodieTableMetaClient metaClient) {
this.properties = properties;
this.readerContext = readerContext;
this.tableSchema = tableSchema;
this.requestedSchema = AvroSchemaCache.intern(requestedSchema);
this.hoodieTableConfig = hoodieTableConfig;
this.hoodieTableConfig = metaClient.getTableConfig();
this.deleteContext = new DeleteContext(properties, tableSchema);
this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema(this.deleteContext));
this.internalSchema = pruneInternalSchema(requiredSchema, internalSchemaOpt);
this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
this.metaClient = metaClient;
}

public Schema getTableSchema() {
Expand Down Expand Up @@ -125,6 +132,18 @@ public DeleteContext getDeleteContext() {
return deleteContext;
}

public Pair<Schema, Map<String, String>> getRequiredSchemaForFileAndRenamedColumns(StoragePath path) {
if (internalSchema.isEmptySchema()) {
return Pair.of(requiredSchema, Collections.emptyMap());
}
long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(path.getName()));
InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitInstantTime, metaClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems not right, the search happens in file split level, this would trigger the metaClient metadata file listing for every file slice read. Can we reuse the cache somewhere and shared by all the readers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any example of how to do this? I noticed that this is how it is currently done in the merge path. This path will at least cache per JVM. There are some other cases where I see calls to InternalSchemaCache.getInternalSchemaByVersionId but that skips the cache entirely so the commit metadata is parsed per file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we already did this in FileGroupRecordBuffer.composeEvolvedSchemaTransformer, and we have optimized the logic in #13525 to get rid of the timeline listing, should be good now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the existing logic of schema evolution on read in other places follows the same code logic, so this is OK in the sense that it brings feature parity and does not introduce regression.

I think what makes more sense is to have a schema history (schemas for range of completion/instant time, e.g., schema1: ts1-ts100, schema2: ts101-ts1000, etc.) constructed on driver and distribute that to executors. This schema history can be stored under .hoodie so one file read gets the whole schema history and executor does not pay cost of scanning commit metadata or reading schema from file (assuming that the file schema is based on the writer/table schema of the commit). This essentially needs a new schema system / abstraction, which is under the scope of RFC-88 @danny0405

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we have a plan to re-impl the schema evolution based on new schema abstraction in 1.2 release.

Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new InternalSchemaMerger(fileSchema, internalSchema,
true, false, false).mergeSchemaGetRenamed();
Schema mergedAvroSchema = AvroSchemaCache.intern(AvroInternalSchemaConverter.convert(mergedInternalSchema.getLeft(), requiredSchema.getFullName()));
return Pair.of(mergedAvroSchema, mergedInternalSchema.getRight());
}

private InternalSchema pruneInternalSchema(Schema requiredSchema, Option<InternalSchema> internalSchemaOption) {
if (!internalSchemaOption.isPresent()) {
return InternalSchema.getEmptyInternalSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorag
readerContext.setShouldMergeUseRecordPosition(readerParameters.useRecordPosition() && !isSkipMerge && readerContext.getHasLogFiles() && inputSplit.isParquetBaseFile());
readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent());
readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex()
? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props)
: new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props));
? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, props, metaClient)
: new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, props, metaClient));
this.outputConverter = readerContext.getSchemaHandler().getOutputConverter();
this.orderingFieldNames = HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), props, hoodieTableMetaClient);
this.readStats = new HoodieReadStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
Expand All @@ -46,9 +46,9 @@ public ParquetRowIndexBasedSchemaHandler(HoodieReaderContext<T> readerContext,
Schema dataSchema,
Schema requestedSchema,
Option<InternalSchema> internalSchemaOpt,
HoodieTableConfig hoodieTableConfig,
TypedProperties properties) {
super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, hoodieTableConfig, properties);
TypedProperties properties,
HoodieTableMetaClient metaClient) {
super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, properties, metaClient);
if (!readerContext.getRecordContext().supportsParquetRowIndex()) {
throw new IllegalStateException("Using " + this.getClass().getName() + " but context does not support parquet row index");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.apache.hudi.common.util.TypeUtils.unsafeCast;

Expand All @@ -48,7 +50,11 @@ protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema reader
return getIndexedRecordIterator(readerSchema, readerSchema);
}

public abstract ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException;
public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException {
return getIndexedRecordIterator(readerSchema, requestedSchema, Collections.emptyMap());
}

public abstract ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema, Map<String, String> renamedColumns) throws IOException;

public abstract ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<String> keys,
Schema readerSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ private HoodieNativeAvroHFileReader(HFileReaderFactory readerFactory,

@Override
public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema,
Schema requestedSchema)
Schema requestedSchema,
Map<String, String> renamedColumns)
throws IOException {
if (!Objects.equals(readerSchema, requestedSchema)) {
if (!Objects.equals(readerSchema, requestedSchema) || !renamedColumns.isEmpty()) {
throw new UnsupportedOperationException(
"Schema projections are not supported in HFile reader");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,8 @@ private ReusableFileGroupRecordBufferLoader<IndexedRecord> buildReusableRecordBu
readerContext.initRecordMerger(metadataConfig.getProps());
readerContext.setHasBootstrapBaseFile(false);
readerContext.setHasLogFiles(fileSlice.hasLogFiles());
readerContext.setSchemaHandler(new FileGroupReaderSchemaHandler<>(readerContext, SCHEMA, SCHEMA, Option.empty(), metadataMetaClient.getTableConfig(), metadataConfig.getProps()));
readerContext.setSchemaHandler(new FileGroupReaderSchemaHandler<>(readerContext, SCHEMA, SCHEMA, Option.empty(),
metadataConfig.getProps(), metadataMetaClient));
readerContext.setShouldMergeUseRecordPosition(false);
readerContext.setLatestCommitTime(latestMetadataInstantTime);
return FileGroupRecordBufferLoader.createReusable(readerContext);
Expand Down
Loading
Loading