diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index bf3063c5d4733..cebf3145bfd28 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -2094,13 +2094,13 @@ public void testCommitWritesRelativePaths(boolean populateMetaFields) throws Exc HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimeline.getInstantDetails(commitInstant).get(), HoodieCommitMetadata.class); String basePath = table.getMetaClient().getBasePath(); - Collection commitPathNames = commitMetadata.getFileIdAndFullPaths(basePath).values(); + Collection commitPathNames = commitMetadata.getFileIdAndFullPaths(new Path(basePath)).values(); // Read from commit file try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime))) { String everything = FileIOUtils.readAsUTFString(inputStream); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class); - HashMap paths = metadata.getFileIdAndFullPaths(basePath); + HashMap paths = metadata.getFileIdAndFullPaths(new Path(basePath)); // Compare values in both to make sure they are equal. for (String pathName : paths.values()) { assertTrue(commitPathNames.contains(pathName)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 75d2d14221d32..3387dd24bb3b0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -109,7 +109,7 @@ private static HashMap getLatestFileIDsToFullPath(String basePat for (HoodieInstant commit : commitsToReturn) { HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); - fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(basePath)); + fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(new Path(basePath))); } return fileIdToFullPath; } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index dd14dca671b39..d45cfe0351ed9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -109,4 +109,15 @@ public static Schema createNullableSchema(Schema.Type avroType) { return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType)); } + /** + * Returns true in case when schema contains the field w/ provided name + */ + public static boolean containsFieldInSchema(Schema schema, String fieldName) { + try { + Schema.Field field = schema.getField(fieldName); + return field != null; + } catch (Exception e) { + return false; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index e2b586964ef4e..66066040275bf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -166,18 +166,6 @@ public static GenericRecord jsonBytesToAvro(byte[] bytes, Schema schema) throws return reader.read(null, jsonDecoder); } - /** - * True if the schema contains this name of field - */ - public static boolean containsFieldInSchema(Schema schema, String fieldName) { - try { - Field field = schema.getField(fieldName); - return field != null; - } catch (Exception e) { - return false; - } - } - public static boolean isMetadataField(String fieldName) { return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName) || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index f5077dea859ae..41d83813f1e0f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -125,7 +125,7 @@ public WriteOperationType getOperationType() { return this.operationType; } - public HashMap getFileIdAndFullPaths(String basePath) { + public HashMap getFileIdAndFullPaths(Path basePath) { HashMap fullPaths = new HashMap<>(); for (Map.Entry entry : getFileIdAndRelativePaths().entrySet()) { String fullPath = entry.getValue() != null diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index b76f71161d320..4f99926887692 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -43,6 +43,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIncompatibleSchemaException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.internal.schema.InternalSchema; @@ -50,6 +51,7 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.io.storage.HoodieOrcReader; +import org.apache.hudi.util.Lazy; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -58,100 +60,56 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; +import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema; +import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema; import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; /** * Helper class to read schema from data files and log files and to convert it between different formats. - * - * TODO(HUDI-3626) cleanup */ +@ThreadSafe public class TableSchemaResolver { private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class); + private final HoodieTableMetaClient metaClient; - private final boolean hasOperationField; - public TableSchemaResolver(HoodieTableMetaClient metaClient) { - this.metaClient = metaClient; - this.hasOperationField = hasOperationField(); - } + /** + * Signals whether suite of the meta-fields should have additional field designating + * operation particular record was added by. Note, that determining whether this meta-field + * should be appended to the schema requires reading out the actual schema of some data file, + * since it's ultimately the source of truth whether this field has to be represented in + * the schema + */ + private final Lazy hasOperationField; /** - * Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest - * commit. We will assume that the schema has not changed within a single atomic write. + * NOTE: {@link HoodieCommitMetadata} could be of non-trivial size for large tables (in 100s of Mbs) + * and therefore we'd want to limit amount of throw-away work being performed while fetching + * commits' metadata * - * @return Parquet schema for this table + * Please check out corresponding methods to fetch commonly used instances of {@link HoodieCommitMetadata}: + * {@link #getLatestCommitMetadataWithValidSchema()}, + * {@link #getLatestCommitMetadataWithValidSchema()}, + * {@link #getCachedCommitMetadata(HoodieInstant)} */ - private MessageType getTableParquetSchemaFromDataFile() { - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - Option> instantAndCommitMetadata = - activeTimeline.getLastCommitMetadataWithValidData(); - try { - switch (metaClient.getTableType()) { - case COPY_ON_WRITE: - // For COW table, the file has data written must be in parquet or orc format currently. - if (instantAndCommitMetadata.isPresent()) { - HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); - Iterator filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator(); - return fetchSchemaFromFiles(filePaths); - } else { - throw new IllegalArgumentException("Could not find any data file written for commit, " - + "so could not get schema for table " + metaClient.getBasePath()); - } - case MERGE_ON_READ: - // For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile. - // Determine the file format based on the file name, and then extract schema from it. - if (instantAndCommitMetadata.isPresent()) { - HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); - Iterator filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator(); - return fetchSchemaFromFiles(filePaths); - } else { - throw new IllegalArgumentException("Could not find any data file written for commit, " - + "so could not get schema for table " + metaClient.getBasePath()); - } - default: - LOG.error("Unknown table type " + metaClient.getTableType()); - throw new InvalidTableException(metaClient.getBasePath()); - } - } catch (IOException e) { - throw new HoodieException("Failed to read data schema", e); - } - } + private final Lazy> commitMetadataCache; - private MessageType fetchSchemaFromFiles(Iterator filePaths) throws IOException { - MessageType type = null; - while (filePaths.hasNext() && type == null) { - String filePath = filePaths.next(); - if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { - // this is a log file - type = readSchemaFromLogFile(new Path(filePath)); - } else { - type = readSchemaFromBaseFile(filePath); - } - } - return type; - } + private volatile HoodieInstant latestCommitWithValidSchema = null; + private volatile HoodieInstant latestCommitWithValidData = null; - private MessageType readSchemaFromBaseFile(String filePath) throws IOException { - if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) { - // this is a parquet file - return readSchemaFromParquetBaseFile(new Path(filePath)); - } else if (filePath.contains(HoodieFileFormat.HFILE.getFileExtension())) { - // this is a HFile - return readSchemaFromHFileBaseFile(new Path(filePath)); - } else if (filePath.contains(HoodieFileFormat.ORC.getFileExtension())) { - // this is a ORC file - return readSchemaFromORCBaseFile(new Path(filePath)); - } else { - throw new IllegalArgumentException("Unknown base file format :" + filePath); - } + public TableSchemaResolver(HoodieTableMetaClient metaClient) { + this.metaClient = metaClient; + this.commitMetadataCache = Lazy.lazily(() -> new ConcurrentHashMap<>(2)); + this.hasOperationField = Lazy.lazily(this::hasOperationField); } public Schema getTableAvroSchemaFromDataFile() { @@ -176,86 +134,25 @@ public Schema getTableAvroSchema() throws Exception { * @throws Exception */ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception { - Schema schema; - Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(includeMetadataFields); - if (schemaFromCommitMetadata.isPresent()) { - schema = schemaFromCommitMetadata.get(); - } else { - Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); - if (schemaFromTableConfig.isPresent()) { - if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField); - } else { - schema = schemaFromTableConfig.get(); - } - } else { - if (includeMetadataFields) { - schema = getTableAvroSchemaFromDataFile(); - } else { - schema = HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); - } - } - } - - Option partitionFieldsOpt = metaClient.getTableConfig().getPartitionFields(); - if (metaClient.getTableConfig().shouldDropPartitionColumns()) { - schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, schema); - } - return schema; + return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); } - public static Schema recreateSchemaWhenDropPartitionColumns(Option partitionFieldsOpt, Schema originSchema) { - // when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in data files. - // And there are no partition schema if the schema is parsed from data files. - // Here we create partition Fields for this case, and use StringType as the data type. - Schema schema = originSchema; - if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 0) { - List partitionFields = Arrays.asList(partitionFieldsOpt.get()); - - final Schema schema0 = originSchema; - boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch( - pt -> !HoodieAvroUtils.containsFieldInSchema(schema0, pt) - ); - boolean hasPartitionColInSchema = partitionFields.stream().anyMatch( - pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt) - ); - if (hasPartitionColNotInSchema && hasPartitionColInSchema) { - throw new HoodieIncompatibleSchemaException( - "Not support: Partial partition fields are still in the schema " - + "when enable hoodie.datasource.write.drop.partition.columns"); - } - - if (hasPartitionColNotInSchema) { - // when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns - // are not in originSchema. So we create and add them. - List newFields = new ArrayList<>(); - for (String partitionField: partitionFields) { - newFields.add(new Schema.Field( - partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); - } - schema = appendFieldsToSchema(schema, newFields); - } - } - return schema; + /** + * Fetches tables schema in Avro format as of the given instant + * + * @param instant as of which table's schema will be fetched + */ + public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception { + return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)); } /** * Gets full schema (user + metadata) for a hoodie table in Parquet format. * * @return Parquet schema for the table - * @throws Exception */ public MessageType getTableParquetSchema() throws Exception { - Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true); - if (schemaFromCommitMetadata.isPresent()) { - return convertAvroSchemaToParquet(schemaFromCommitMetadata.get()); - } - Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); - if (schemaFromTableConfig.isPresent()) { - Schema schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField); - return convertAvroSchemaToParquet(schema); - } - return getTableParquetSchemaFromDataFile(); + return convertAvroSchemaToParquet(getTableAvroSchema(true)); } /** @@ -271,41 +168,43 @@ public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception { return getTableAvroSchema(false); } - /** - * Gets users data schema for a hoodie table in Avro format of the instant. - * - * @param instant will get the instant data schema - * @return Avro user data schema - * @throws Exception - * @deprecated use {@link #getTableSchemaFromCommitMetadata} instead - */ - @Deprecated - public Schema getTableAvroSchemaWithoutMetadataFields(HoodieInstant instant) throws Exception { - Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(instant, false); - if (schemaFromCommitMetadata.isPresent()) { - return schemaFromCommitMetadata.get(); - } - Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); - if (schemaFromTableConfig.isPresent()) { - return schemaFromTableConfig.get(); + private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option instantOpt) { + Schema schema = + (instantOpt.isPresent() + ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) + : getTableSchemaFromLatestCommitMetadata(includeMetadataFields)) + .or(() -> + metaClient.getTableConfig().getTableCreateSchema() + .map(tableSchema -> + includeMetadataFields + ? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get()) + : tableSchema) + ) + .orElseGet(() -> { + Schema schemaFromDataFile = getTableAvroSchemaFromDataFile(); + return includeMetadataFields + ? schemaFromDataFile + : HoodieAvroUtils.removeMetadataFields(schemaFromDataFile); + }); + + // TODO partition columns have to be appended in all read-paths + if (metaClient.getTableConfig().shouldDropPartitionColumns()) { + return metaClient.getTableConfig().getPartitionFields() + .map(partitionFields -> appendPartitionColumns(schema, partitionFields)) + .orElse(schema); } - return HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); + + return schema; } - /** - * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit with valid schema. - * - * @return Avro schema for this table - */ - private Option getTableSchemaFromCommitMetadata(boolean includeMetadataFields) { - Option> instantAndCommitMetadata = - metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(); + private Option getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) { + Option> instantAndCommitMetadata = getLatestCommitMetadataWithValidSchema(); if (instantAndCommitMetadata.isPresent()) { HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); String schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); Schema schema = new Schema.Parser().parse(schemaStr); if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField); + schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField.get()); } return Option.of(schema); } else { @@ -313,17 +212,9 @@ private Option getTableSchemaFromCommitMetadata(boolean includeMetadataF } } - - /** - * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the instant. - * - * @return Avro schema for this table - */ private Option getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) { try { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - byte[] data = timeline.getInstantDetails(instant).get(); - HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + HoodieCommitMetadata metadata = getCachedCommitMetadata(instant); String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); if (StringUtils.isNullOrEmpty(existingSchemaStr)) { @@ -332,7 +223,7 @@ private Option getTableSchemaFromCommitMetadata(HoodieInstant instant, b Schema schema = new Schema.Parser().parse(existingSchemaStr); if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField); + schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField.get()); } return Option.of(schema); } catch (Exception e) { @@ -341,23 +232,41 @@ private Option getTableSchemaFromCommitMetadata(HoodieInstant instant, b } /** - * Convert a parquet scheme to the avro format. - * - * @param parquetSchema The parquet schema to convert - * @return The converted avro schema + * Fetches the schema for a table from any the table's data files */ - public Schema convertParquetSchemaToAvro(MessageType parquetSchema) { + private MessageType getTableParquetSchemaFromDataFile() { + Option> instantAndCommitMetadata = getLatestCommitMetadataWithValidData(); + try { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + case MERGE_ON_READ: + // For COW table, data could be written in either Parquet or Orc format currently; + // For MOR table, data could be written in either Parquet, Orc, Hfile or Delta-log format currently; + // + // Determine the file format based on the file name, and then extract schema from it. + if (instantAndCommitMetadata.isPresent()) { + HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); + Iterator filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().iterator(); + return fetchSchemaFromFiles(filePaths); + } else { + throw new IllegalArgumentException("Could not find any data file written for commit, " + + "so could not get schema for table " + metaClient.getBasePath()); + } + default: + LOG.error("Unknown table type " + metaClient.getTableType()); + throw new InvalidTableException(metaClient.getBasePath()); + } + } catch (IOException e) { + throw new HoodieException("Failed to read data schema", e); + } + } + + private Schema convertParquetSchemaToAvro(MessageType parquetSchema) { AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf()); return avroSchemaConverter.convert(parquetSchema); } - /** - * Convert a avro scheme to the parquet format. - * - * @param schema The avro schema to convert - * @return The converted parquet schema - */ - public MessageType convertAvroSchemaToParquet(Schema schema) { + private MessageType convertAvroSchemaToParquet(Schema schema) { AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf()); return avroSchemaConverter.convert(schema); } @@ -482,10 +391,7 @@ public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAd return latestSchema; } - /** - * Read the parquet schema from a parquet File. - */ - public MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException { + private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException { LOG.info("Reading schema from " + parquetFilePath); FileSystem fs = metaClient.getRawFs(); @@ -494,35 +400,27 @@ public MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IO return fileFooter.getFileMetaData().getSchema(); } - /** - * Read the parquet schema from a HFile. - */ - public MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException { + private MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException { LOG.info("Reading schema from " + hFilePath); FileSystem fs = metaClient.getRawFs(); CacheConfig cacheConfig = new CacheConfig(fs.getConf()); HoodieHFileReader hFileReader = new HoodieHFileReader<>(fs.getConf(), hFilePath, cacheConfig); - return convertAvroSchemaToParquet(hFileReader.getSchema()); } - - /** - * Read the parquet schema from a ORC file. - */ - public MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws IOException { + private MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws IOException { LOG.info("Reading schema from " + orcFilePath); FileSystem fs = metaClient.getRawFs(); HoodieOrcReader orcReader = new HoodieOrcReader<>(fs.getConf(), orcFilePath); - return convertAvroSchemaToParquet(orcReader.getSchema()); } /** * Read schema from a data file from the last compaction commit done. - * @throws Exception + * + * @deprecated please use {@link #getTableAvroSchema(HoodieInstant, boolean)} instead */ public MessageType readSchemaFromLastCompaction(Option lastCompactionCommitOpt) throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); @@ -533,18 +431,13 @@ public MessageType readSchemaFromLastCompaction(Option lastCompac // Read from the compacted file wrote HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata .fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class); - String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() + String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().stream().findAny() .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); return readSchemaFromBaseFile(filePath); } - /** - * Read the schema from the log file on path. - * - * @return - */ - public MessageType readSchemaFromLogFile(Path path) throws IOException { + private MessageType readSchemaFromLogFile(Path path) throws IOException { return readSchemaFromLogFile(metaClient.getRawFs(), path); } @@ -566,20 +459,6 @@ public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws } } - public boolean isHasOperationField() { - return hasOperationField; - } - - private boolean hasOperationField() { - try { - Schema tableAvroSchema = getTableAvroSchemaFromDataFile(); - return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null; - } catch (Exception e) { - LOG.info(String.format("Failed to read operation field from avro schema (%s)", e.getMessage())); - return false; - } - } - /** * Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant. * @@ -587,11 +466,7 @@ private boolean hasOperationField() { */ public Option getTableInternalSchemaFromCommitMetadata() { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - if (timeline.lastInstant().isPresent()) { - return getTableInternalSchemaFromCommitMetadata(timeline.lastInstant().get()); - } else { - return Option.empty(); - } + return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata); } /** @@ -601,9 +476,7 @@ public Option getTableInternalSchemaFromCommitMetadata() { */ private Option getTableInternalSchemaFromCommitMetadata(HoodieInstant instant) { try { - HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedInstants(); - byte[] data = timeline.getInstantDetails(instant).get(); - HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + HoodieCommitMetadata metadata = getCachedCommitMetadata(instant); String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA); if (latestInternalSchemaStr != null) { return SerDeHelper.fromJson(latestInternalSchemaStr); @@ -626,4 +499,128 @@ public Option getTableHistorySchemaStrFromCommitMetadata() { String result = manager.getHistorySchemaStr(); return result.isEmpty() ? Option.empty() : Option.of(result); } + + /** + * NOTE: This method could only be used in tests + * + * @VisibleForTesting + */ + public boolean hasOperationField() { + try { + Schema tableAvroSchema = getTableAvroSchemaFromDataFile(); + return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null; + } catch (Exception e) { + LOG.info(String.format("Failed to read operation field from avro schema (%s)", e.getMessage())); + return false; + } + } + + private Option> getLatestCommitMetadataWithValidSchema() { + if (latestCommitWithValidSchema == null) { + Option> instantAndCommitMetadata = + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(); + if (instantAndCommitMetadata.isPresent()) { + HoodieInstant instant = instantAndCommitMetadata.get().getLeft(); + HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight(); + synchronized (this) { + if (latestCommitWithValidSchema == null) { + latestCommitWithValidSchema = instant; + } + commitMetadataCache.get().putIfAbsent(instant, metadata); + } + } + } + + return Option.ofNullable(latestCommitWithValidSchema) + .map(instant -> Pair.of(instant, commitMetadataCache.get().get(instant))); + } + + private Option> getLatestCommitMetadataWithValidData() { + if (latestCommitWithValidData == null) { + Option> instantAndCommitMetadata = + metaClient.getActiveTimeline().getLastCommitMetadataWithValidData(); + if (instantAndCommitMetadata.isPresent()) { + HoodieInstant instant = instantAndCommitMetadata.get().getLeft(); + HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight(); + synchronized (this) { + if (latestCommitWithValidData == null) { + latestCommitWithValidData = instant; + } + commitMetadataCache.get().putIfAbsent(instant, metadata); + } + } + } + + return Option.ofNullable(latestCommitWithValidData) + .map(instant -> Pair.of(instant, commitMetadataCache.get().get(instant))); + } + + private HoodieCommitMetadata getCachedCommitMetadata(HoodieInstant instant) { + return commitMetadataCache.get() + .computeIfAbsent(instant, (missingInstant) -> { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + byte[] data = timeline.getInstantDetails(missingInstant).get(); + try { + return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to fetch HoodieCommitMetadata for instant (%s)", missingInstant), e); + } + }); + } + + private MessageType fetchSchemaFromFiles(Iterator filePaths) throws IOException { + MessageType type = null; + while (filePaths.hasNext() && type == null) { + String filePath = filePaths.next(); + if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { + // this is a log file + type = readSchemaFromLogFile(new Path(filePath)); + } else { + type = readSchemaFromBaseFile(filePath); + } + } + return type; + } + + private MessageType readSchemaFromBaseFile(String filePath) throws IOException { + if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) { + return readSchemaFromParquetBaseFile(new Path(filePath)); + } else if (filePath.contains(HoodieFileFormat.HFILE.getFileExtension())) { + return readSchemaFromHFileBaseFile(new Path(filePath)); + } else if (filePath.contains(HoodieFileFormat.ORC.getFileExtension())) { + return readSchemaFromORCBaseFile(new Path(filePath)); + } else { + throw new IllegalArgumentException("Unknown base file format :" + filePath); + } + } + + static Schema appendPartitionColumns(Schema dataSchema, String[] partitionFields) { + // In cases when {@link DROP_PARTITION_COLUMNS} config is set true, partition columns + // won't be persisted w/in the data files, and therefore we need to append such columns + // when schema is parsed from data files + // + // Here we append partition columns with {@code StringType} as the data type + if (partitionFields.length == 0) { + return dataSchema; + } + + boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf)); + boolean hasPartitionColInSchema = Arrays.stream(partitionFields).anyMatch(pf -> containsFieldInSchema(dataSchema, pf)); + if (hasPartitionColNotInSchema && hasPartitionColInSchema) { + throw new HoodieIncompatibleSchemaException("Partition columns could not be partially contained w/in the data schema"); + } + + if (hasPartitionColNotInSchema) { + // when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns + // are not in originSchema. So we create and add them. + List newFields = new ArrayList<>(); + for (String partitionField: partitionFields) { + newFields.add(new Schema.Field( + partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); + } + return appendFieldsToSchema(dataSchema, newFields); + } + + return dataSchema; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index a62068e655e5d..c069e41ade265 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -18,6 +18,10 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -28,11 +32,6 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -45,11 +44,10 @@ import java.util.Comparator; import java.util.Date; import java.util.HashSet; -import java.util.List; import java.util.Objects; import java.util.Set; import java.util.function.Function; -import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the @@ -267,43 +265,47 @@ public Option getInstantDetails(HoodieInstant instant) { } /** - * Get the last instant with valid schema, and convert this to HoodieCommitMetadata + * Returns most recent instant having valid schema in its {@link HoodieCommitMetadata} */ public Option> getLastCommitMetadataWithValidSchema() { - List completed = getCommitsTimeline().filterCompletedInstants().getInstants() - .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList()); - for (HoodieInstant instant : completed) { - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - getInstantDetails(instant).get(), HoodieCommitMetadata.class); - if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) { - return Option.of(Pair.of(instant, commitMetadata)); - } - } catch (IOException e) { - LOG.warn("Failed to convert instant to HoodieCommitMetadata: " + instant.toString()); - } - } - return Option.empty(); + return Option.fromJavaOptional( + getCommitMetadataStream() + .filter(instantCommitMetadataPair -> + !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) + .findFirst() + ); } /** * Get the last instant with valid data, and convert this to HoodieCommitMetadata */ public Option> getLastCommitMetadataWithValidData() { - List completed = getCommitsTimeline().filterCompletedInstants().getInstants() - .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList()); - for (HoodieInstant instant : completed) { - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - getInstantDetails(instant).get(), HoodieCommitMetadata.class); - if (!commitMetadata.getFileIdAndRelativePaths().isEmpty()) { - return Option.of(Pair.of(instant, commitMetadata)); - } - } catch (IOException e) { - LOG.warn("Failed to convert instant to HoodieCommitMetadata: " + instant.toString()); - } - } - return Option.empty(); + return Option.fromJavaOptional( + getCommitMetadataStream() + .filter(instantCommitMetadataPair -> + !instantCommitMetadataPair.getValue().getFileIdAndRelativePaths().isEmpty()) + .findFirst() + ); + } + + /** + * Returns stream of {@link HoodieCommitMetadata} in order reverse to chronological (ie most + * recent metadata being the first element) + */ + private Stream> getCommitMetadataStream() { + // NOTE: Streams are lazy + return getCommitsTimeline().filterCompletedInstants() + .getInstants() + .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()) + .map(instant -> { + try { + HoodieCommitMetadata commitMetadata = + HoodieCommitMetadata.fromBytes(getInstantDetails(instant).get(), HoodieCommitMetadata.class); + return Pair.of(instant, commitMetadata); + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to fetch HoodieCommitMetadata for instant (%s)", instant), e); + } + }); } public Option readCleanerInfoAsBytes(HoodieInstant instant) { diff --git a/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java index 106969b70ff6c..1a843430b7d9c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java +++ b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import javax.annotation.concurrent.ThreadSafe; import java.util.function.Supplier; /** @@ -25,6 +26,7 @@ * * @param type of the object being held by {@link Lazy} */ +@ThreadSafe public class Lazy { private volatile boolean initialized; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java index e0e57e812b8a2..51d5c5212f260 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java @@ -19,11 +19,8 @@ package org.apache.hudi.common.table; import org.apache.avro.Schema; - import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; - import org.apache.hudi.exception.HoodieIncompatibleSchemaException; import org.junit.jupiter.api.Test; @@ -37,24 +34,19 @@ public class TestTableSchemaResolver { public void testRecreateSchemaWhenDropPartitionColumns() { Schema originSchema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - // case1 - Option emptyPartitionFieldsOpt = Option.empty(); - Schema s1 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(emptyPartitionFieldsOpt, originSchema); - assertEquals(originSchema, s1); - // case2 String[] pts1 = new String[0]; - Schema s2 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts1), originSchema); + Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, pts1); assertEquals(originSchema, s2); // case3: partition_path is in originSchema String[] pts2 = {"partition_path"}; - Schema s3 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts2), originSchema); + Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, pts2); assertEquals(originSchema, s3); // case4: user_partition is not in originSchema String[] pts3 = {"user_partition"}; - Schema s4 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts3), originSchema); + Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, pts3); assertNotEquals(originSchema, s4); assertTrue(s4.getFields().stream().anyMatch(f -> f.name().equals("user_partition"))); Schema.Field f = s4.getField("user_partition"); @@ -63,7 +55,7 @@ public void testRecreateSchemaWhenDropPartitionColumns() { // case5: user_partition is in originSchema, but partition_path is in originSchema String[] pts4 = {"user_partition", "partition_path"}; try { - TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts3), originSchema); + TableSchemaResolver.appendPartitionColumns(originSchema, pts3); } catch (HoodieIncompatibleSchemaException e) { assertTrue(e.getMessage().contains("Partial partition fields are still in the schema")); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index c1229d55009e4..eee5a4881c9b9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -213,27 +213,18 @@ class DefaultSource extends RelationProvider globPaths: Seq[Path], userSchema: Option[StructType], metaClient: HoodieTableMetaClient, - optParams: Map[String, String]) = { + optParams: Map[String, String]): BaseRelation = { val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths) - val enableSchemaOnRead: Boolean = !tryFetchInternalSchema(metaClient).isEmptySchema // NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of // [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/ // vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]]. // // You can check out HUDI-3896 for more details - if (enableSchemaOnRead) { + if (baseRelation.hasSchemaOnRead) { baseRelation } else { baseRelation.toHadoopFsRelation } } - - private def tryFetchInternalSchema(metaClient: HoodieTableMetaClient) = - try { - new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata - .orElse(InternalSchema.getEmptyInternalSchema) - } catch { - case _: Exception => InternalSchema.getEmptyInternalSchema - } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 08f87816d7c35..4a12256c4330a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -122,9 +122,13 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) .map(HoodieSqlCommonUtils.formatQueryInstant) + /** + * NOTE: Initialization of teh following members is coupled on purpose to minimize amount of I/O + * required to fetch table's Avro and Internal schemas + */ protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = { - val schemaUtil = new TableSchemaResolver(metaClient) - val avroSchema = Try(schemaUtil.getTableAvroSchema) match { + val schemaResolver = new TableSchemaResolver(metaClient) + val avroSchema = Try(schemaResolver.getTableAvroSchema) match { case Success(schema) => schema case Failure(e) => logWarning("Failed to fetch schema from the table", e) @@ -137,14 +141,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } // try to find internalSchema val internalSchemaFromMeta = try { - schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema) + schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema) } catch { case _: Exception => InternalSchema.getEmptyInternalSchema } (avroSchema, internalSchemaFromMeta) } - protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) + protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) @@ -196,7 +200,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * meaning that regardless of whether this columns are being requested by the query they will be fetched * regardless so that relation is able to combine records properly (if necessary) * - * @VisibleInTests + * @VisibleForTesting */ val mandatoryFields: Seq[String] @@ -215,6 +219,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def queryTimestamp: Option[String] = specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp)) + /** + * Returns true in case table supports Schema on Read (Schema Evolution) + */ + def hasSchemaOnRead: Boolean = !internalSchema.isEmptySchema + override def schema: StructType = tableStructSchema /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index d9d5812adbe2f..81b12dbcb6e4a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -58,9 +58,9 @@ class IncrementalRelation(val sqlContext: SQLContext, private val log = LogManager.getLogger(classOf[IncrementalRelation]) val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema - private val basePath = metaClient.getBasePath + private val basePath = metaClient.getBasePathV2 // TODO : Figure out a valid HoodieWriteConfig - private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(), + private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath.toString).build(), new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)), metaClient) private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() @@ -98,7 +98,7 @@ class IncrementalRelation(val sqlContext: SQLContext, val tableSchema = if (useEndInstantSchema && iSchema.isEmptySchema) { if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else - schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last) + schemaResolver.getTableAvroSchema(commitsToReturn.last, false) } else { schemaResolver.getTableAvroSchemaWithoutMetadataFields() } @@ -202,7 +202,7 @@ class IncrementalRelation(val sqlContext: SQLContext, var doFullTableScan = false if (fallbackToFullTableScan) { - val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration); + val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration); val timer = new HoodieTimer().startTimer(); val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths @@ -223,7 +223,7 @@ class IncrementalRelation(val sqlContext: SQLContext, val hudiDF = sqlContext.read .format("hudi_v1") .schema(usedSchema) - .load(basePath) + .load(basePath.toString) .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key))) .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala index 85e1925bc1655..3258c7536d1c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala @@ -210,7 +210,7 @@ class TestTableSchemaResolverWithSparkSQL { .setConf(spark.sessionState.newHadoopConf()) .build() - assertTrue(new TableSchemaResolver(metaClient).isHasOperationField) + assertTrue(new TableSchemaResolver(metaClient).hasOperationField) schemaValuationBasedOnDataFile(metaClient, schema.toString()) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 760d1269c2e42..fc9de60c67374 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -615,7 +615,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { .setConf(spark.sessionState.newHadoopConf()) .build() - assertResult(true)(new TableSchemaResolver(metaClient).isHasOperationField) + assertResult(true)(new TableSchemaResolver(metaClient).hasOperationField) spark.sql( s"""