Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class ITTestDataStreamWrite extends TestLogger {
@ParameterizedTest
@ValueSource(strings = {"BUCKET", "FLINK_STATE"})
public void testWriteCopyOnWrite(String indexType) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testWriteCopyOnWriteWithChainedTransformer() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"BUCKET", "FLINK_STATE"})
public void testWriteMergeOnReadWithCompaction(String indexType) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
Expand All @@ -167,7 +167,7 @@ public void testWriteCopyOnWriteWithSortClustering() throws Exception {
}

private void testWriteCopyOnWriteWithClustering(boolean sortClusteringEnabled) throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.OPERATION, "insert");
Expand All @@ -182,7 +182,7 @@ private void testWriteToHoodie(
Transformer transformer,
String jobName,
Map<String, List<String>> expected) throws Exception {
testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()),
testWriteToHoodie(TestConfigurations.getDefaultConf(tempFile.toURI().toString()),
Option.of(transformer), jobName, 2, expected);
}

Expand Down Expand Up @@ -336,7 +336,7 @@ public void testHoodiePipelineBuilderSource() throws Exception {
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.TABLE_NAME, "t1");
conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");

Expand All @@ -345,10 +345,10 @@ public void testHoodiePipelineBuilderSource() throws Exception {
TestData.writeData(TestData.dataSetInsert(3, 4), conf);
TestData.writeData(TestData.dataSetInsert(5, 6), conf);

String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
String latestCommit = TestUtils.getLastCompleteInstant(tempFile.toURI().toString());

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString());
options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit);

//read a hoodie table use low-level source api.
Expand Down Expand Up @@ -378,7 +378,7 @@ public void testHoodiePipelineBuilderSink() throws Exception {
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString());
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
Configuration conf = Configuration.fromMap(options);
// Read from file source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@ public static void checkWrittenDataCOW(
Map<String, List<String>> expected) throws IOException {

// 1. init flink table
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.toURI().toString());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.toURI().toString()).build();
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);

// 2. check each partition data
Expand Down