Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CombinedRateLimiter;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.KeyedLock;
Expand All @@ -42,6 +44,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.LongConsumer;

public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener {

Expand All @@ -52,6 +55,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
private final CopyOnWriteArrayList<Consumer<String>> closeSessionListeners = new CopyOnWriteArrayList<>();
private final ThreadPool threadPool;
private final CcrSettings ccrSettings;
private final CounterMetric throttleTime = new CounterMetric();

public CcrRestoreSourceService(ThreadPool threadPool, CcrSettings ccrSettings) {
this.threadPool = threadPool;
Expand Down Expand Up @@ -136,7 +140,7 @@ public synchronized SessionReader getSessionReader(String sessionUUID) {
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
}
restore.idle = false;
return new SessionReader(restore);
return new SessionReader(restore, ccrSettings, throttleTime::inc);
}

private void internalCloseSession(String sessionUUID, boolean throwIfSessionMissing) {
Expand Down Expand Up @@ -182,6 +186,10 @@ private void maybeTimeout(String sessionUUID) {
}
}

public long getThrottleTime() {
return this.throttleTime.count();
}

private static class RestoreSession extends AbstractRefCounted {

private final String sessionUUID;
Expand Down Expand Up @@ -254,9 +262,13 @@ protected void closeInternal() {
public static class SessionReader implements Closeable {

private final RestoreSession restoreSession;
private final CcrSettings ccrSettings;
private final LongConsumer throttleListener;

private SessionReader(RestoreSession restoreSession) {
private SessionReader(RestoreSession restoreSession, CcrSettings ccrSettings, LongConsumer throttleListener) {
this.restoreSession = restoreSession;
this.ccrSettings = ccrSettings;
this.throttleListener = throttleListener;
restoreSession.incRef();
}

Expand All @@ -270,6 +282,9 @@ private SessionReader(RestoreSession restoreSession) {
* @throws IOException if the read fails
*/
public long readFileBytes(String fileName, BytesReference reference) throws IOException {
CombinedRateLimiter rateLimiter = ccrSettings.getRateLimiter();
long throttleTime = rateLimiter.maybePause(reference.length());
throttleListener.accept(throttleTime);
return restoreSession.readFileBytes(fileName, reference);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,15 @@ public void testDocsAreRecovered() throws Exception {
}

public void testRateLimitingIsEmployed() throws Exception {
boolean followerRateLimiting = randomBoolean();

ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
if (followerRateLimiting) {
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
} else {
assertAcked(leaderClient().admin().cluster().updateSettings(settingsRequest).actionGet());
}

String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
Expand All @@ -256,11 +262,15 @@ public void testRateLimitingIsEmployed() throws Exception {
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);

List<CcrRepository> repositories = new ArrayList<>();
List<CcrRestoreSourceService> restoreSources = new ArrayList<>();

for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
Repository repository = repositoriesService.repository(leaderClusterRepoName);
repositories.add((CcrRepository) repository);
}
for (CcrRestoreSourceService restoreSource : getLeaderCluster().getDataOrMasterNodeInstances(CcrRestoreSourceService.class)) {
restoreSources.add(restoreSource);
}

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
Expand All @@ -282,12 +292,20 @@ public void testRateLimitingIsEmployed() throws Exception {
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
future.actionGet();

assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));
if (followerRateLimiting) {
assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));
} else {
assertTrue(restoreSources.stream().anyMatch(cr -> cr.getThrottleTime() > 0));
}

settingsRequest = new ClusterUpdateSettingsRequest();
ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
if (followerRateLimiting) {
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
} else {
assertAcked(leaderClient().admin().cluster().updateSettings(settingsRequest).actionGet());
}
}

public void testFollowerMappingIsUpdated() throws IOException {
Expand Down