26
26
import org .apache .flink .runtime .state .CheckpointStateOutputStream ;
27
27
import org .apache .flink .runtime .state .CheckpointStreamFactory ;
28
28
import org .apache .flink .runtime .state .CheckpointedStateScope ;
29
- import org .apache .flink .runtime .state .IncrementalKeyedStateHandle ;
29
+ import org .apache .flink .runtime .state .IncrementalKeyedStateHandle . HandleAndLocalPath ;
30
30
import org .apache .flink .runtime .state .StateUtil ;
31
31
import org .apache .flink .runtime .state .StreamStateHandle ;
32
+ import org .apache .flink .runtime .state .filesystem .FileStateHandle ;
33
+ import org .apache .flink .state .forst .fs .ForStFlinkFileSystem ;
34
+ import org .apache .flink .state .forst .fs .filemapping .FileBackedMappingEntrySource ;
35
+ import org .apache .flink .state .forst .fs .filemapping .HandleBackedMappingEntrySource ;
36
+ import org .apache .flink .state .forst .fs .filemapping .MappingEntry ;
37
+ import org .apache .flink .state .forst .fs .filemapping .MappingEntrySource ;
32
38
import org .apache .flink .util .IOUtils ;
33
-
34
- import java .io .IOException ;
35
- import java .io .InputStream ;
39
+ import org .apache .flink .util .Preconditions ;
36
40
37
41
import javax .annotation .Nonnull ;
38
42
import javax .annotation .Nullable ;
39
43
44
+ import java .io .IOException ;
45
+ import java .io .InputStream ;
46
+ import java .util .Collections ;
47
+ import java .util .List ;
48
+
40
49
/**
41
50
* Data transfer strategy for ForSt DB without a remote DB path. It always copies the file to/from
42
51
* checkpoint storage when transferring data.
@@ -54,7 +63,7 @@ public class CopyDataTransferStrategy extends DataTransferStrategy {
54
63
}
55
64
56
65
@ Override
57
- public IncrementalKeyedStateHandle . HandleAndLocalPath transferToCheckpoint (
66
+ public HandleAndLocalPath transferToCheckpoint (
58
67
Path dbFilePath ,
59
68
long maxTransferBytes ,
60
69
CheckpointStreamFactory checkpointStreamFactory ,
@@ -63,7 +72,6 @@ public IncrementalKeyedStateHandle.HandleAndLocalPath transferToCheckpoint(
63
72
CloseableRegistry tmpResourcesRegistry )
64
73
throws IOException {
65
74
66
- LOG .trace ("Copy file to checkpoint: {}" , dbFilePath );
67
75
if (maxTransferBytes < 0 ) {
68
76
// Means transfer whole file to checkpoint storage.
69
77
maxTransferBytes = Long .MAX_VALUE ;
@@ -92,7 +100,7 @@ public String toString() {
92
100
return "CopyDataTransferStrategy{" + ", dbFileSystem=" + dbFileSystem + '}' ;
93
101
}
94
102
95
- private static IncrementalKeyedStateHandle . HandleAndLocalPath copyFileToCheckpoint (
103
+ private static HandleAndLocalPath copyFileToCheckpoint (
96
104
FileSystem dbFileSystem ,
97
105
Path filePath ,
98
106
long maxTransferBytes ,
@@ -101,7 +109,87 @@ private static IncrementalKeyedStateHandle.HandleAndLocalPath copyFileToCheckpoi
101
109
CloseableRegistry closeableRegistry ,
102
110
CloseableRegistry tmpResourcesRegistry )
103
111
throws IOException {
112
+ StreamStateHandle handleByDuplicating =
113
+ duplicateFileToCheckpoint (
114
+ dbFileSystem , filePath , checkpointStreamFactory , stateScope );
115
+ if (handleByDuplicating != null ) {
116
+ LOG .trace ("Duplicate file to checkpoint: {} {}" , filePath , handleByDuplicating );
117
+ return HandleAndLocalPath .of (handleByDuplicating , filePath .getName ());
118
+ }
119
+
120
+ HandleAndLocalPath handleAndLocalPath =
121
+ HandleAndLocalPath .of (
122
+ writeFileToCheckpoint (
123
+ dbFileSystem ,
124
+ filePath ,
125
+ maxTransferBytes ,
126
+ checkpointStreamFactory ,
127
+ stateScope ,
128
+ closeableRegistry ,
129
+ tmpResourcesRegistry ),
130
+ filePath .getName ());
131
+ LOG .trace ("Write file to checkpoint: {}, {}" , filePath , handleAndLocalPath .getHandle ());
132
+ return handleAndLocalPath ;
133
+ }
134
+
135
+ /**
136
+ * Duplicate file to checkpoint storage by calling {@link CheckpointStreamFactory#duplicate} if
137
+ * possible.
138
+ */
139
+ private static @ Nullable StreamStateHandle duplicateFileToCheckpoint (
140
+ FileSystem dbFileSystem ,
141
+ Path filePath ,
142
+ CheckpointStreamFactory checkpointStreamFactory ,
143
+ CheckpointedStateScope stateScope )
144
+ throws IOException {
145
+
146
+ StreamStateHandle stateHandle = getStateHandle (dbFileSystem , filePath );
147
+
148
+ if (!checkpointStreamFactory .canFastDuplicate (stateHandle , stateScope )) {
149
+ return null ;
150
+ }
151
+
152
+ List <StreamStateHandle > result =
153
+ checkpointStreamFactory .duplicate (
154
+ Collections .singletonList (stateHandle ), stateScope );
155
+ return result .get (0 );
156
+ }
104
157
158
+ private static StreamStateHandle getStateHandle (FileSystem dbFileSystem , Path filePath )
159
+ throws IOException {
160
+ Path sourceFilePath = filePath ;
161
+ if (dbFileSystem instanceof ForStFlinkFileSystem ) {
162
+ MappingEntry mappingEntry =
163
+ ((ForStFlinkFileSystem ) dbFileSystem ).getMappingEntry (filePath );
164
+ Preconditions .checkNotNull (
165
+ mappingEntry , "File mapping entry not found for %s" , filePath );
166
+
167
+ MappingEntrySource source = mappingEntry .getSource ();
168
+ if (source instanceof HandleBackedMappingEntrySource ) {
169
+ // return the state handle stored in MappingEntry
170
+ return ((HandleBackedMappingEntrySource ) source ).getStateHandle ();
171
+ } else {
172
+ // use file path stored in MappingEntry
173
+ sourceFilePath = ((FileBackedMappingEntrySource ) source ).getFilePath ();
174
+ }
175
+ }
176
+
177
+ // construct a FileStateHandle base on source file
178
+ FileSystem sourceFileSystem = sourceFilePath .getFileSystem ();
179
+ long fileLength = sourceFileSystem .getFileStatus (sourceFilePath ).getLen ();
180
+ return new FileStateHandle (sourceFilePath , fileLength );
181
+ }
182
+
183
+ /** Write file to checkpoint storage through {@link CheckpointStateOutputStream}. */
184
+ private static @ Nullable StreamStateHandle writeFileToCheckpoint (
185
+ FileSystem dbFileSystem ,
186
+ Path filePath ,
187
+ long maxTransferBytes ,
188
+ CheckpointStreamFactory checkpointStreamFactory ,
189
+ CheckpointedStateScope stateScope ,
190
+ CloseableRegistry closeableRegistry ,
191
+ CloseableRegistry tmpResourcesRegistry )
192
+ throws IOException {
105
193
InputStream inputStream = null ;
106
194
CheckpointStateOutputStream outputStream = null ;
107
195
@@ -137,14 +225,11 @@ private static IncrementalKeyedStateHandle.HandleAndLocalPath copyFileToCheckpoi
137
225
tmpResourcesRegistry .registerCloseable (
138
226
() -> StateUtil .discardStateObjectQuietly (result ));
139
227
140
- return IncrementalKeyedStateHandle .HandleAndLocalPath .of (result , filePath .getName ());
141
-
228
+ return result ;
142
229
} finally {
143
-
144
230
if (closeableRegistry .unregisterCloseable (inputStream )) {
145
231
IOUtils .closeQuietly (inputStream );
146
232
}
147
-
148
233
if (closeableRegistry .unregisterCloseable (outputStream )) {
149
234
IOUtils .closeQuietly (outputStream );
150
235
}
0 commit comments