diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index bad592aa21d28..1836857383554 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -20,12 +20,16 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; @@ -381,8 +385,8 @@ private List buildFileIndex() { } private InputFormat getStreamInputFormat() { - // if table does not exist, use schema from the DDL - Schema tableAvroSchema = this.metaClient == null ? inferSchemaFromDdl() : getTableAvroSchema(); + // if table does not exist or table data does not exist, use schema from the DDL + Schema tableAvroSchema = (this.metaClient == null || !tableDataExists()) ? inferSchemaFromDdl() : getTableAvroSchema(); final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); final RowType rowType = (RowType) rowDataType.getLogicalType(); final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType(); @@ -399,6 +403,15 @@ private List buildFileIndex() { throw new HoodieException(errMsg); } + /** + * Returns whether the hoodie table data exists . + */ + private boolean tableDataExists() { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + Option> instantAndCommitMetadata = activeTimeline.getLastCommitMetadataWithValidData(); + return instantAndCommitMetadata.isPresent(); + } + private MergeOnReadInputFormat mergeOnReadInputFormat( RowType rowType, RowType requiredRowType,