diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index bccba2f78d122..87f27d2247ac5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -66,6 +66,7 @@ public class FileIndex { private final RowType rowType; private final boolean tableExists; private final HoodieMetadataConfig metadataConfig; + private final org.apache.hadoop.conf.Configuration hadoopConf; private final PartitionPruners.PartitionPruner partitionPruner; // for partition pruning private final DataPruner dataPruner; // for data skipping private final int dataBucket; // for bucket pruning @@ -74,7 +75,8 @@ public class FileIndex { private FileIndex(Path path, Configuration conf, RowType rowType, DataPruner dataPruner, PartitionPruners.PartitionPruner partitionPruner, int dataBucket) { this.path = path; this.rowType = rowType; - this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf)); + this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); + this.tableExists = StreamerUtil.tableExists(path.toString(), hadoopConf); this.metadataConfig = metadataConfig(conf); this.dataPruner = isDataSkippingFeasible(conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ? dataPruner : null; this.partitionPruner = partitionPruner; @@ -145,7 +147,8 @@ public FileStatus[] getFilesInPartitions() { return new FileStatus[0]; } String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new); - FileStatus[] allFiles = FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(), partitions) + FileStatus[] allFiles = FSUtils.getFilesInPartitions( + new HoodieFlinkEngineContext(hadoopConf), metadataConfig, path.toString(), partitions) .values().stream() .flatMap(Arrays::stream) .toArray(FileStatus[]::new); @@ -271,7 +274,7 @@ public List getOrBuildPartitionPaths() { return this.partitionPaths; } List allPartitionPaths = this.tableExists - ? FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString()) + ? FSUtils.getAllPartitionPaths(new HoodieFlinkEngineContext(hadoopConf), metadataConfig, path.toString()) : Collections.emptyList(); if (this.partitionPruner == null) { this.partitionPaths = allPartitionPaths;