Skip to content

Commit b842230

Browse files
amazingJingMyasuka
authored andcommitted
[FLINK-23949][runtime][checkpoint] fix first incremental checkpoint after a savepoint will degenerate into a full checkpoint
1 parent bdb75a8 commit b842230

File tree

2 files changed

+197
-1
lines changed

2 files changed

+197
-1
lines changed

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,11 @@ protected RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(
174174
@Override
175175
public void notifyCheckpointComplete(long completedCheckpointId) {
176176
synchronized (materializedSstFiles) {
177-
if (completedCheckpointId > lastCompletedCheckpointId) {
177+
// FLINK-23949: materializedSstFiles.keySet().contains(completedCheckpointId) make sure
178+
// the notified checkpointId is not a savepoint, otherwise next checkpoint will
179+
// degenerate into a full checkpoint
180+
if (completedCheckpointId > lastCompletedCheckpointId
181+
&& materializedSstFiles.keySet().contains(completedCheckpointId)) {
178182
materializedSstFiles
179183
.keySet()
180184
.removeIf(checkpointId -> checkpointId < completedCheckpointId);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.contrib.streaming.state.snapshot;
20+
21+
import org.apache.flink.api.common.state.StateDescriptor;
22+
import org.apache.flink.api.common.typeutils.base.IntSerializer;
23+
import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
24+
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
25+
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
26+
import org.apache.flink.contrib.streaming.state.RocksDBResource;
27+
import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
28+
import org.apache.flink.core.fs.CloseableRegistry;
29+
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
30+
import org.apache.flink.runtime.state.ArrayListSerializer;
31+
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
32+
import org.apache.flink.runtime.state.KeyGroupRange;
33+
import org.apache.flink.runtime.state.KeyedStateHandle;
34+
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
35+
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
36+
import org.apache.flink.runtime.state.SnapshotResult;
37+
import org.apache.flink.runtime.state.StateHandleID;
38+
import org.apache.flink.runtime.state.StreamStateHandle;
39+
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
40+
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
41+
import org.apache.flink.util.ResourceGuard;
42+
43+
import org.junit.Assert;
44+
import org.junit.Rule;
45+
import org.junit.Test;
46+
import org.junit.rules.TemporaryFolder;
47+
import org.rocksdb.ColumnFamilyHandle;
48+
import org.rocksdb.RocksDB;
49+
import org.rocksdb.RocksDBException;
50+
51+
import java.io.File;
52+
import java.io.IOException;
53+
import java.util.ArrayList;
54+
import java.util.LinkedHashMap;
55+
import java.util.Map;
56+
import java.util.Set;
57+
import java.util.SortedMap;
58+
import java.util.TreeMap;
59+
import java.util.UUID;
60+
import java.util.concurrent.RunnableFuture;
61+
62+
import static org.apache.flink.core.fs.Path.fromLocalFile;
63+
import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
64+
65+
/** Tests for {@link RocksIncrementalSnapshotStrategy}. */
66+
public class RocksIncrementalSnapshotStrategyTest {
67+
68+
@Rule public final TemporaryFolder tmp = new TemporaryFolder();
69+
70+
@Rule public RocksDBResource rocksDBResource = new RocksDBResource();
71+
72+
// Verify the next checkpoint is still incremental after a savepoint completed.
73+
@Test
74+
public void testCheckpointIsIncremental() throws Exception {
75+
76+
try (CloseableRegistry closeableRegistry = new CloseableRegistry();
77+
RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
78+
createSnapshotStrategy(closeableRegistry)) {
79+
FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory();
80+
81+
// make and notify checkpoint with id 1
82+
snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory);
83+
checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
84+
85+
// notify savepoint with id 2
86+
checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
87+
88+
// make checkpoint with id 3
89+
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
90+
snapshot(3L, checkpointSnapshotStrategy, checkpointStreamFactory);
91+
92+
// If 3rd checkpoint's placeholderStateHandleCount > 0,it means 3rd checkpoint is
93+
// incremental.
94+
Map<StateHandleID, StreamStateHandle> sharedState3 =
95+
incrementalRemoteKeyedStateHandle3.getSharedState();
96+
long placeholderStateHandleCount =
97+
sharedState3.entrySet().stream()
98+
.filter(e -> e.getValue() instanceof PlaceholderStreamStateHandle)
99+
.count();
100+
101+
Assert.assertTrue(placeholderStateHandleCount > 0);
102+
}
103+
}
104+
105+
public RocksIncrementalSnapshotStrategy createSnapshotStrategy(
106+
CloseableRegistry closeableRegistry) throws IOException, RocksDBException {
107+
108+
ColumnFamilyHandle columnFamilyHandle = rocksDBResource.createNewColumnFamily("test");
109+
RocksDB rocksDB = rocksDBResource.getRocksDB();
110+
byte[] key = "checkpoint".getBytes();
111+
byte[] val = "incrementalTest".getBytes();
112+
rocksDB.put(columnFamilyHandle, key, val);
113+
114+
// construct RocksIncrementalSnapshotStrategy
115+
long lastCompletedCheckpointId = -1L;
116+
ResourceGuard rocksDBResourceGuard = new ResourceGuard();
117+
SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
118+
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
119+
new LinkedHashMap<>();
120+
121+
RocksDBStateUploader rocksDBStateUploader =
122+
new RocksDBStateUploader(
123+
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue());
124+
125+
int keyGroupPrefixBytes =
126+
RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2);
127+
128+
RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
129+
new RegisteredKeyValueStateBackendMetaInfo<>(
130+
StateDescriptor.Type.VALUE,
131+
"test",
132+
IntSerializer.INSTANCE,
133+
new ArrayListSerializer<>(IntSerializer.INSTANCE));
134+
135+
RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo =
136+
new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfo);
137+
kvStateInformation.putIfAbsent("test", rocksDbKvStateInfo);
138+
139+
RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
140+
new RocksIncrementalSnapshotStrategy(
141+
rocksDB,
142+
rocksDBResourceGuard,
143+
IntSerializer.INSTANCE,
144+
kvStateInformation,
145+
new KeyGroupRange(0, 1),
146+
keyGroupPrefixBytes,
147+
TestLocalRecoveryConfig.disabled(),
148+
closeableRegistry,
149+
tmp.newFolder(),
150+
UUID.randomUUID(),
151+
materializedSstFiles,
152+
rocksDBStateUploader,
153+
lastCompletedCheckpointId);
154+
155+
return checkpointSnapshotStrategy;
156+
}
157+
158+
public FsCheckpointStreamFactory createFsCheckpointStreamFactory() throws IOException {
159+
int threshold = 100;
160+
File checkpointsDir = tmp.newFolder("checkpointsDir");
161+
File sharedStateDir = tmp.newFolder("sharedStateDir");
162+
FsCheckpointStreamFactory checkpointStreamFactory =
163+
new FsCheckpointStreamFactory(
164+
getSharedInstance(),
165+
fromLocalFile(checkpointsDir),
166+
fromLocalFile(sharedStateDir),
167+
threshold,
168+
threshold);
169+
return checkpointStreamFactory;
170+
}
171+
172+
public IncrementalRemoteKeyedStateHandle snapshot(
173+
long checkpointId,
174+
RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy,
175+
FsCheckpointStreamFactory checkpointStreamFactory)
176+
throws Exception {
177+
178+
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture =
179+
checkpointSnapshotStrategy.doSnapshot(
180+
checkpointId,
181+
checkpointId,
182+
checkpointStreamFactory,
183+
CheckpointOptions.forCheckpointWithDefaultLocation());
184+
snapshotRunnableFuture.run();
185+
186+
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
187+
(IncrementalRemoteKeyedStateHandle)
188+
snapshotRunnableFuture.get().getJobManagerOwnedSnapshot();
189+
190+
return incrementalRemoteKeyedStateHandle;
191+
}
192+
}

0 commit comments

Comments
 (0)