From 1c5538a10b227074dad563229e3714d9a4ec0d4c Mon Sep 17 00:00:00 2001 From: xiaoxingstack Date: Mon, 31 Oct 2022 17:51:46 +0800 Subject: [PATCH] [HUDI-5107] Fix hadoop config in DirectWriteMarkers, HoodieFlinkEngineContext and StreamerUtil are not consistent issue --- .../java/org/apache/hudi/table/marker/DirectWriteMarkers.java | 2 +- .../apache/hudi/client/common/HoodieFlinkEngineContext.java | 4 ++++ .../src/main/java/org/apache/hudi/util/StreamerUtil.java | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java index e813382079634..b5c9b3004f1c6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java @@ -106,7 +106,7 @@ public Set createdAndMergedDataPaths(HoodieEngineContext context, int pa context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker files for all created, merged paths"); dataFiles.addAll(context.flatMap(subDirectories, directory -> { Path path = new Path(directory); - FileSystem fileSystem = path.getFileSystem(serializedConf.get()); + FileSystem fileSystem = FSUtils.getFs(path, serializedConf.get()); RemoteIterator itr = fileSystem.listFiles(path, true); List result = new ArrayList<>(); while (itr.hasNext()) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 5e38c24d3091a..c9136da6bb453 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -67,6 +67,10 @@ private HoodieFlinkEngineContext() { this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), new DefaultTaskContextSupplier()); } + public HoodieFlinkEngineContext(org.apache.hadoop.conf.Configuration hadoopConf) { + this(new SerializableConfiguration(hadoopConf), new DefaultTaskContextSupplier()); + } + public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) { this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), taskContextSupplier); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index cdacefbf173e4..35446ce76862b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -423,7 +423,7 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); // build the write client to start the embedded timeline server - final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); + final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)), writeConfig); writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); // create the filesystem view storage properties for client final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();