Skip to content

Commit eb43ab6

Browse files
authored
Implement leader rate limiting for file restore (#37677)
This is related to #35975. This commit implements rate limiting on the leader side using the CombinedRateLimiter.
1 parent 2ba9e36 commit eb43ab6

File tree

2 files changed

+38
-5
lines changed

2 files changed

+38
-5
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
import org.elasticsearch.common.bytes.BytesReference;
1717
import org.elasticsearch.common.component.AbstractLifecycleComponent;
1818
import org.elasticsearch.common.lease.Releasable;
19+
import org.elasticsearch.common.metrics.CounterMetric;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.unit.TimeValue;
22+
import org.elasticsearch.common.util.CombinedRateLimiter;
2123
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
2224
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2325
import org.elasticsearch.common.util.concurrent.KeyedLock;
@@ -42,6 +44,7 @@
4244
import java.util.concurrent.ConcurrentHashMap;
4345
import java.util.concurrent.CopyOnWriteArrayList;
4446
import java.util.function.Consumer;
47+
import java.util.function.LongConsumer;
4548

4649
public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener {
4750

@@ -52,6 +55,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
5255
private final CopyOnWriteArrayList<Consumer<String>> closeSessionListeners = new CopyOnWriteArrayList<>();
5356
private final ThreadPool threadPool;
5457
private final CcrSettings ccrSettings;
58+
private final CounterMetric throttleTime = new CounterMetric();
5559

5660
public CcrRestoreSourceService(ThreadPool threadPool, CcrSettings ccrSettings) {
5761
this.threadPool = threadPool;
@@ -136,7 +140,7 @@ public synchronized SessionReader getSessionReader(String sessionUUID) {
136140
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
137141
}
138142
restore.idle = false;
139-
return new SessionReader(restore);
143+
return new SessionReader(restore, ccrSettings, throttleTime::inc);
140144
}
141145

142146
private void internalCloseSession(String sessionUUID, boolean throwIfSessionMissing) {
@@ -182,6 +186,10 @@ private void maybeTimeout(String sessionUUID) {
182186
}
183187
}
184188

189+
public long getThrottleTime() {
190+
return this.throttleTime.count();
191+
}
192+
185193
private static class RestoreSession extends AbstractRefCounted {
186194

187195
private final String sessionUUID;
@@ -254,9 +262,13 @@ protected void closeInternal() {
254262
public static class SessionReader implements Closeable {
255263

256264
private final RestoreSession restoreSession;
265+
private final CcrSettings ccrSettings;
266+
private final LongConsumer throttleListener;
257267

258-
private SessionReader(RestoreSession restoreSession) {
268+
private SessionReader(RestoreSession restoreSession, CcrSettings ccrSettings, LongConsumer throttleListener) {
259269
this.restoreSession = restoreSession;
270+
this.ccrSettings = ccrSettings;
271+
this.throttleListener = throttleListener;
260272
restoreSession.incRef();
261273
}
262274

@@ -270,6 +282,9 @@ private SessionReader(RestoreSession restoreSession) {
270282
* @throws IOException if the read fails
271283
*/
272284
public long readFileBytes(String fileName, BytesReference reference) throws IOException {
285+
CombinedRateLimiter rateLimiter = ccrSettings.getRateLimiter();
286+
long throttleTime = rateLimiter.maybePause(reference.length());
287+
throttleListener.accept(throttleTime);
273288
return restoreSession.readFileBytes(fileName, reference);
274289
}
275290

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,15 @@ public void testDocsAreRecovered() throws Exception {
238238
}
239239

240240
public void testRateLimitingIsEmployed() throws Exception {
241+
boolean followerRateLimiting = randomBoolean();
242+
241243
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
242244
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
243-
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
245+
if (followerRateLimiting) {
246+
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
247+
} else {
248+
assertAcked(leaderClient().admin().cluster().updateSettings(settingsRequest).actionGet());
249+
}
244250

245251
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
246252
String leaderIndex = "index1";
@@ -256,11 +262,15 @@ public void testRateLimitingIsEmployed() throws Exception {
256262
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
257263

258264
List<CcrRepository> repositories = new ArrayList<>();
265+
List<CcrRestoreSourceService> restoreSources = new ArrayList<>();
259266

260267
for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
261268
Repository repository = repositoriesService.repository(leaderClusterRepoName);
262269
repositories.add((CcrRepository) repository);
263270
}
271+
for (CcrRestoreSourceService restoreSource : getLeaderCluster().getDataOrMasterNodeInstances(CcrRestoreSourceService.class)) {
272+
restoreSources.add(restoreSource);
273+
}
264274

265275
logger.info("--> indexing some data");
266276
for (int i = 0; i < 100; i++) {
@@ -282,12 +292,20 @@ public void testRateLimitingIsEmployed() throws Exception {
282292
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
283293
future.actionGet();
284294

285-
assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));
295+
if (followerRateLimiting) {
296+
assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));
297+
} else {
298+
assertTrue(restoreSources.stream().anyMatch(cr -> cr.getThrottleTime() > 0));
299+
}
286300

287301
settingsRequest = new ClusterUpdateSettingsRequest();
288302
ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
289303
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
290-
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
304+
if (followerRateLimiting) {
305+
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
306+
} else {
307+
assertAcked(leaderClient().admin().cluster().updateSettings(settingsRequest).actionGet());
308+
}
291309
}
292310

293311
public void testFollowerMappingIsUpdated() throws IOException {

0 commit comments

Comments
 (0)