diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index aa521fcc3e551..b37fa0302a051 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -837,8 +837,12 @@ private void validateSchema() throws HoodieUpsertException, HoodieInsertExceptio try { TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient()); + Option existingTableSchema = schemaResolver.getTableAvroSchemaIfPresent(false); + if (!existingTableSchema.isPresent()) { + return; + } Schema writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema()); - Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchema(false)); + Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get()); AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames()); } catch (Exception e) { throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 9b31a51d92504..02b1ef352515b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -68,6 +68,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema; import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema; @@ -113,8 +114,12 @@ public TableSchemaResolver(HoodieTableMetaClient metaClient) { this.hasOperationField = Lazy.lazily(this::hasOperationField); } - public Schema getTableAvroSchemaFromDataFile() { - return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile()); + public Schema getTableAvroSchemaFromDataFile() throws Exception { + return getTableAvroSchemaFromDataFileInternal().orElseThrow(schemaNotFoundError()); + } + + private Option getTableAvroSchemaFromDataFileInternal() { + return getTableParquetSchemaFromDataFile().map(this::convertParquetSchemaToAvro); } /** @@ -135,7 +140,7 @@ public Schema getTableAvroSchema() throws Exception { * @throws Exception */ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception { - return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); + return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()).orElseThrow(schemaNotFoundError()); } /** @@ -148,7 +153,8 @@ public Schema getTableAvroSchema(String timestamp) throws Exception { .filterCompletedInstants() .findInstantsBeforeOrEquals(timestamp) .lastInstant(); - return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant); + return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant) + .orElseThrow(schemaNotFoundError()); } /** @@ -157,7 +163,7 @@ public Schema getTableAvroSchema(String timestamp) throws Exception { * @param instant as of which table's schema will be fetched */ public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception { - return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)); + return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)).orElseThrow(schemaNotFoundError()); } /** @@ -188,11 +194,15 @@ public MessageType getTableParquetSchema(boolean includeMetadataField) throws Ex */ @Deprecated public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception { - return getTableAvroSchema(false); + return getTableAvroSchemaInternal(false, Option.empty()).orElseThrow(schemaNotFoundError()); + } + + public Option getTableAvroSchemaIfPresent(boolean includeMetadataFields) { + return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); } - private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option instantOpt) { - Schema schema = + private Option getTableAvroSchemaInternal(boolean includeMetadataFields, Option instantOpt) { + Option schema = (instantOpt.isPresent() ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) : getTableSchemaFromLatestCommitMetadata(includeMetadataFields)) @@ -203,18 +213,18 @@ private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option< ? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get()) : tableSchema) ) - .orElseGet(() -> { - Schema schemaFromDataFile = getTableAvroSchemaFromDataFile(); + .or(() -> { + Option schemaFromDataFile = getTableAvroSchemaFromDataFileInternal(); return includeMetadataFields ? schemaFromDataFile - : HoodieAvroUtils.removeMetadataFields(schemaFromDataFile); + : schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields); }); // TODO partition columns have to be appended in all read-paths - if (metaClient.getTableConfig().shouldDropPartitionColumns()) { + if (metaClient.getTableConfig().shouldDropPartitionColumns() && schema.isPresent()) { return metaClient.getTableConfig().getPartitionFields() - .map(partitionFields -> appendPartitionColumns(schema, Option.ofNullable(partitionFields))) - .orElse(schema); + .map(partitionFields -> appendPartitionColumns(schema.get(), Option.ofNullable(partitionFields))) + .or(() -> schema); } return schema; @@ -257,7 +267,7 @@ private Option getTableSchemaFromCommitMetadata(HoodieInstant instant, b /** * Fetches the schema for a table from any the table's data files */ - private MessageType getTableParquetSchemaFromDataFile() { + private Option getTableParquetSchemaFromDataFile() { Option> instantAndCommitMetadata = getLatestCommitMetadataWithValidData(); try { switch (metaClient.getTableType()) { @@ -270,10 +280,11 @@ private MessageType getTableParquetSchemaFromDataFile() { if (instantAndCommitMetadata.isPresent()) { HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); Iterator filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().iterator(); - return fetchSchemaFromFiles(filePaths); + return Option.of(fetchSchemaFromFiles(filePaths)); } else { - throw new IllegalArgumentException("Could not find any data file written for commit, " + LOG.warn("Could not find any data file written for commit, " + "so could not get schema for table " + metaClient.getBasePath()); + return Option.empty(); } default: LOG.error("Unknown table type " + metaClient.getTableType()); @@ -308,7 +319,7 @@ private MessageType convertAvroSchemaToParquet(Schema schema) { */ public Option getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception { if (metaClient.isTimelineNonEmpty()) { - return Option.of(getTableAvroSchemaInternal(includeMetadataFields, Option.empty())); + return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); } return Option.empty(); @@ -569,4 +580,8 @@ public static Schema appendPartitionColumns(Schema dataSchema, Option return dataSchema; } + + private Supplier schemaNotFoundError() { + return () -> new IllegalArgumentException("No schema found for table at " + metaClient.getBasePathV2().toString()); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 7780204ae3d24..13b93d4c11fa9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -113,7 +113,7 @@ public static void setPreCombineField(Configuration conf, HoodieTableMetaClient * @param conf The configuration * @param metaClient The meta client */ - public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) { + public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) throws Exception { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile(); if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 6626084aa2bdf..2adfbc1c1022d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -1141,10 +1141,11 @@ private Schema getSchemaForWriteConfig(Schema targetSchema) { .build(); int totalCompleted = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants(); if (totalCompleted > 0) { - try { - TableSchemaResolver schemaResolver = new TableSchemaResolver(meta); - newWriteSchema = schemaResolver.getTableAvroSchema(false); - } catch (IllegalArgumentException e) { + TableSchemaResolver schemaResolver = new TableSchemaResolver(meta); + Option tableSchema = schemaResolver.getTableAvroSchemaIfPresent(false); + if (tableSchema.isPresent()) { + newWriteSchema = tableSchema.get(); + } else { LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider"); } }