diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 8571390f53093..31aba2b2db132 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -136,7 +136,7 @@ public HoodieTableSource( List partitionKeys, String defaultPartName, Configuration conf) { - this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null); + this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null, null); } public HoodieTableSource( @@ -148,7 +148,8 @@ public HoodieTableSource( @Nullable FileIndex fileIndex, @Nullable List> requiredPartitions, @Nullable int[] requiredPos, - @Nullable Long limit) { + @Nullable Long limit, + @Nullable HoodieTableMetaClient metaClient) { this.schema = schema; this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType(); this.path = path; @@ -164,7 +165,7 @@ public HoodieTableSource( : requiredPos; this.limit = limit == null ? NO_LIMIT_CONSTANT : limit; this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); - this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf); + this.metaClient = metaClient == null ? StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient; this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf); } @@ -215,7 +216,7 @@ public ChangelogMode getChangelogMode() { @Override public DynamicTableSource copy() { return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, - conf, fileIndex, requiredPartitions, requiredPos, limit); + conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index d8093793fcccc..10a7e44373573 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.utils.TestConfigurations; @@ -148,6 +149,14 @@ void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() { assertEquals(expectedFilters, actualFilters); } + @Test + void testHoodieSourceCachedMetaClient() { + HoodieTableSource tableSource = getEmptyStreamingSource(); + HoodieTableMetaClient metaClient = tableSource.getMetaClient(); + HoodieTableSource tableSourceCopy = (HoodieTableSource) tableSource.copy(); + assertThat(metaClient, is(tableSourceCopy.getMetaClient())); + } + private HoodieTableSource getEmptyStreamingSource() { final String path = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(path);