diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index e6d2ddb7b5509..193c0abcd8d62 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -100,7 +100,7 @@ public class ITTestDataStreamWrite extends TestLogger { @ParameterizedTest @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) public void testWriteCopyOnWrite(String indexType) throws Exception { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setString(FlinkOptions.INDEX_TYPE, indexType); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1); conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); @@ -146,7 +146,7 @@ public void testWriteCopyOnWriteWithChainedTransformer() throws Exception { @ParameterizedTest @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) public void testWriteMergeOnReadWithCompaction(String indexType) throws Exception { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setString(FlinkOptions.INDEX_TYPE, indexType); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4); conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); @@ -167,7 +167,7 @@ public void testWriteCopyOnWriteWithSortClustering() throws Exception { } private void testWriteCopyOnWriteWithClustering(boolean sortClusteringEnabled) throws Exception { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true); conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1); conf.setString(FlinkOptions.OPERATION, "insert"); @@ -182,7 +182,7 @@ private void testWriteToHoodie( Transformer transformer, String jobName, Map> expected) throws Exception { - testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), + testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.toURI().toString()), Option.of(transformer), jobName, 2, expected); } @@ -336,7 +336,7 @@ public void testHoodiePipelineBuilderSource() throws Exception { // set up checkpoint interval execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); conf.setString(FlinkOptions.TABLE_NAME, "t1"); conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ"); @@ -345,10 +345,10 @@ public void testHoodiePipelineBuilderSource() throws Exception { TestData.writeData(TestData.dataSetInsert(3, 4), conf); TestData.writeData(TestData.dataSetInsert(5, 6), conf); - String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); + String latestCommit = TestUtils.getLastCompleteInstant(tempFile.toURI().toString()); Map options = new HashMap<>(); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString()); options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit); //read a hoodie table use low-level source api. @@ -378,7 +378,7 @@ public void testHoodiePipelineBuilderSink() throws Exception { execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString()); options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString()); Configuration conf = Configuration.fromMap(options); // Read from file source diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index e7e34cc15ad0e..8e1dd9964cb0b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -626,8 +626,8 @@ public static void checkWrittenDataCOW( Map> expected) throws IOException { // 1. init flink table - HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath()); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build(); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.toURI().toString()); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.toURI().toString()).build(); HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); // 2. check each partition data