Skip to content

Commit 909a6a1

Browse files
qinjunjerrysjwiesman
authored andcommitted
[FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info
This closes apache#16542
1 parent 600ce81 commit 909a6a1

File tree

2 files changed

+8
-9
lines changed

2 files changed

+8
-9
lines changed

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
/** This output format copies files from an existing savepoint into a new directory. */
3434
@Internal
35-
public final class FileCopyFunction implements OutputFormat<String> {
35+
public final class FileCopyFunction implements OutputFormat<Path> {
3636

3737
private static final long serialVersionUID = 1L;
3838

@@ -52,8 +52,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
5252
}
5353

5454
@Override
55-
public void writeRecord(String record) throws IOException {
56-
Path sourcePath = new Path(record);
55+
public void writeRecord(Path sourcePath) throws IOException {
5756
Path destPath = new Path(path, sourcePath.getName());
5857
try (FSDataOutputStream os =
5958
destPath.getFileSystem()

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,18 @@
3434

3535
/** Extracts all file paths that are part of the provided {@link OperatorState}. */
3636
@Internal
37-
public class StatePathExtractor implements FlatMapFunction<OperatorState, String> {
37+
public class StatePathExtractor implements FlatMapFunction<OperatorState, Path> {
3838

3939
private static final long serialVersionUID = 1L;
4040

4141
@Override
42-
public void flatMap(OperatorState operatorState, Collector<String> out) throws Exception {
42+
public void flatMap(OperatorState operatorState, Collector<Path> out) throws Exception {
4343
for (OperatorSubtaskState subTaskState : operatorState.getSubtaskStates().values()) {
4444
// managed operator state
4545
for (OperatorStateHandle operatorStateHandle : subTaskState.getManagedOperatorState()) {
4646
Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle);
4747
if (path != null) {
48-
out.collect(path.getPath());
48+
out.collect(path);
4949
}
5050
}
5151
// managed keyed state
@@ -55,15 +55,15 @@ public void flatMap(OperatorState operatorState, Collector<String> out) throws E
5555
getStateFilePathFromStreamStateHandle(
5656
(KeyGroupsStateHandle) keyedStateHandle);
5757
if (path != null) {
58-
out.collect(path.getPath());
58+
out.collect(path);
5959
}
6060
}
6161
}
6262
// raw operator state
6363
for (OperatorStateHandle operatorStateHandle : subTaskState.getRawOperatorState()) {
6464
Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle);
6565
if (path != null) {
66-
out.collect(path.getPath());
66+
out.collect(path);
6767
}
6868
}
6969
// raw keyed state
@@ -73,7 +73,7 @@ public void flatMap(OperatorState operatorState, Collector<String> out) throws E
7373
getStateFilePathFromStreamStateHandle(
7474
(KeyGroupsStateHandle) keyedStateHandle);
7575
if (path != null) {
76-
out.collect(path.getPath());
76+
out.collect(path);
7777
}
7878
}
7979
}

0 commit comments

Comments
 (0)