Skip to content

Commit fadb89c

Browse files
authored
Ensure CCR partial reads never overuse buffer (#58620)
When the documents are large, a follower can receive a partial response because the requesting range of operations is capped by max_read_request_size instead of max_read_request_operation_count. In this case, the follower will continue reading the subsequent ranges without checking the remaining size of the buffer. The buffer then can use more memory than max_write_buffer_size and even causes OOM.
1 parent 045bdca commit fadb89c

File tree

3 files changed

+83
-11
lines changed

3 files changed

+83
-11
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
9191
private long failedWriteRequests = 0;
9292
private long operationWritten = 0;
9393
private long lastFetchTime = -1;
94+
private final Queue<Tuple<Long, Long>> partialReadRequests = new PriorityQueue<>(Comparator.comparing(Tuple::v1));
9495
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
9596
private long bufferSizeInBytes = 0;
9697
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;
@@ -188,6 +189,20 @@ synchronized void coordinateReads() {
188189

189190
LOGGER.trace("{} coordinate reads, lastRequestedSeqNo={}, leaderGlobalCheckpoint={}",
190191
params.getFollowShardId(), lastRequestedSeqNo, leaderGlobalCheckpoint);
192+
assert partialReadRequests.size() <= params.getMaxOutstandingReadRequests() :
193+
"too many partial read requests [" + partialReadRequests + "]";
194+
while (hasReadBudget() && partialReadRequests.isEmpty() == false) {
195+
final Tuple<Long, Long> range = partialReadRequests.remove();
196+
assert range.v1() <= range.v2() && range.v2() <= lastRequestedSeqNo :
197+
"invalid partial range [" + range.v1() + "," + range.v2() + "]; last requested seq_no [" + lastRequestedSeqNo + "]";
198+
final long fromSeqNo = range.v1();
199+
final long maxRequiredSeqNo = range.v2();
200+
final int requestOpCount = Math.toIntExact(maxRequiredSeqNo - fromSeqNo + 1);
201+
LOGGER.trace("{}[{} ongoing reads] continue partial read request from_seqno={} max_required_seqno={} batch_count={}",
202+
params.getFollowShardId(), numOutstandingReads, fromSeqNo, maxRequiredSeqNo, requestOpCount);
203+
numOutstandingReads++;
204+
sendShardChangesRequest(fromSeqNo, requestOpCount, maxRequiredSeqNo);
205+
}
191206
final int maxReadRequestOperationCount = params.getMaxReadRequestOperationCount();
192207
while (hasReadBudget() && lastRequestedSeqNo < leaderGlobalCheckpoint) {
193208
final long from = lastRequestedSeqNo + 1;
@@ -203,8 +218,8 @@ synchronized void coordinateReads() {
203218
LOGGER.trace("{}[{} ongoing reads] read from_seqno={} max_required_seqno={} batch_count={}",
204219
params.getFollowShardId(), numOutstandingReads, from, maxRequiredSeqNo, requestOpCount);
205220
numOutstandingReads++;
206-
sendShardChangesRequest(from, requestOpCount, maxRequiredSeqNo);
207221
lastRequestedSeqNo = maxRequiredSeqNo;
222+
sendShardChangesRequest(from, requestOpCount, maxRequiredSeqNo);
208223
}
209224

210225
if (numOutstandingReads == 0 && hasReadBudget()) {
@@ -220,6 +235,9 @@ synchronized void coordinateReads() {
220235

221236
private boolean hasReadBudget() {
222237
assert Thread.holdsLock(this);
238+
// TODO: To ensure that we never overuse the buffer, we need to
239+
// - Overestimate the size and count of the responses of the outstanding request when calculating the budget
240+
// - Limit the size and count of next read requests by the remaining size and count of the buffer
223241
if (numOutstandingReads >= params.getMaxOutstandingReadRequests()) {
224242
LOGGER.trace("{} no new reads, maximum number of concurrent reads have been reached [{}]",
225243
params.getFollowShardId(), numOutstandingReads);
@@ -229,7 +247,7 @@ private boolean hasReadBudget() {
229247
LOGGER.trace("{} no new reads, buffer size limit has been reached [{}]", params.getFollowShardId(), bufferSizeInBytes);
230248
return false;
231249
}
232-
if (buffer.size() > params.getMaxWriteBufferCount()) {
250+
if (buffer.size() >= params.getMaxWriteBufferCount()) {
233251
LOGGER.trace("{} no new reads, buffer count limit has been reached [{}]", params.getFollowShardId(), buffer.size());
234252
return false;
235253
}
@@ -374,16 +392,13 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar
374392
"] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]";
375393
coordinateWrites();
376394
}
377-
if (newFromSeqNo <= maxRequiredSeqNo && isStopped() == false) {
378-
int newSize = Math.toIntExact(maxRequiredSeqNo - newFromSeqNo + 1);
379-
LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...",
395+
if (newFromSeqNo <= maxRequiredSeqNo) {
396+
LOGGER.trace("{} received [{}] operations, enqueue partial read request [{}/{}]",
380397
params.getFollowShardId(), response.getOperations().length, newFromSeqNo, maxRequiredSeqNo);
381-
sendShardChangesRequest(newFromSeqNo, newSize, maxRequiredSeqNo);
382-
} else {
383-
// read is completed, decrement
384-
numOutstandingReads--;
385-
coordinateReads();
398+
partialReadRequests.add(Tuple.tuple(newFromSeqNo, maxRequiredSeqNo));
386399
}
400+
numOutstandingReads--;
401+
coordinateReads();
387402
}
388403

389404
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.common.collect.ImmutableOpenMap;
4343
import org.elasticsearch.common.network.NetworkModule;
4444
import org.elasticsearch.common.settings.Settings;
45+
import org.elasticsearch.common.unit.ByteSizeValue;
4546
import org.elasticsearch.common.unit.TimeValue;
4647
import org.elasticsearch.common.util.set.Sets;
4748
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -475,6 +476,8 @@ public static PutFollowAction.Request putFollow(String leaderIndex, String follo
475476
request.setFollowerIndex(followerIndex);
476477
request.getParameters().setMaxRetryDelay(TimeValue.timeValueMillis(10));
477478
request.getParameters().setReadPollTimeout(TimeValue.timeValueMillis(10));
479+
request.getParameters().setMaxReadRequestSize(new ByteSizeValue(between(1, 32 * 1024 * 1024)));
480+
request.getParameters().setMaxReadRequestOperationCount(between(1, 10000));
478481
request.waitForActiveShards(waitForActiveShards);
479482
return request;
480483
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ccr.action;
77

88
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.action.ActionListener;
910
import org.elasticsearch.common.UUIDs;
1011
import org.elasticsearch.common.collect.Tuple;
1112
import org.elasticsearch.common.settings.Settings;
@@ -79,6 +80,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
7980
private Queue<Long> followerGlobalCheckpoints;
8081
private Queue<Long> maxSeqNos;
8182
private Queue<Integer> responseSizes;
83+
private Queue<ActionListener<BulkShardOperationsResponse>> pendingBulkShardRequests;
8284

8385
public void testCoordinateReads() {
8486
ShardFollowTaskParams params = new ShardFollowTaskParams();
@@ -599,6 +601,55 @@ public void testReceiveNothingExpectedSomething() {
599601
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
600602
}
601603

604+
public void testHandlePartialResponses() {
605+
ShardFollowTaskParams params = new ShardFollowTaskParams();
606+
params.maxReadRequestOperationCount = 10;
607+
params.maxOutstandingReadRequests = 2;
608+
params.maxOutstandingWriteRequests = 1;
609+
params.maxWriteBufferCount = 3;
610+
611+
ShardFollowNodeTask task = createShardFollowTask(params);
612+
startTask(task, 99, -1);
613+
614+
task.coordinateReads();
615+
assertThat(shardChangesRequests.size(), equalTo(2));
616+
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
617+
assertThat(shardChangesRequests.get(0)[1], equalTo(10L));
618+
assertThat(shardChangesRequests.get(1)[0], equalTo(10L));
619+
assertThat(shardChangesRequests.get(1)[1], equalTo(10L));
620+
621+
task.innerHandleReadResponse(0L, 9L, generateShardChangesResponse(0L, 5L, 0L, 0L, 1L, 99L));
622+
assertThat(pendingBulkShardRequests, hasSize(1));
623+
assertThat("continue the partial request", shardChangesRequests, hasSize(3));
624+
assertThat(shardChangesRequests.get(2)[0], equalTo(6L));
625+
assertThat(shardChangesRequests.get(2)[1], equalTo(4L));
626+
assertThat(pendingBulkShardRequests, hasSize(1));
627+
task.innerHandleReadResponse(10, 19L, generateShardChangesResponse(10L, 17L, 0L, 0L, 1L, 99L));
628+
assertThat("do not continue partial reads as the buffer is full", shardChangesRequests, hasSize(3));
629+
task.innerHandleReadResponse(6L, 9L, generateShardChangesResponse(6L, 8L, 0L, 0L, 1L, 99L));
630+
assertThat("do not continue partial reads as the buffer is full", shardChangesRequests, hasSize(3));
631+
pendingBulkShardRequests.remove().onResponse(new BulkShardOperationsResponse());
632+
assertThat(pendingBulkShardRequests, hasSize(1));
633+
634+
assertThat("continue two partial requests as the buffer is empty after sending", shardChangesRequests, hasSize(5));
635+
assertThat(shardChangesRequests.get(3)[0], equalTo(9L));
636+
assertThat(shardChangesRequests.get(3)[1], equalTo(1L));
637+
assertThat(shardChangesRequests.get(4)[0], equalTo(18L));
638+
assertThat(shardChangesRequests.get(4)[1], equalTo(2L));
639+
640+
task.innerHandleReadResponse(18L, 19L, generateShardChangesResponse(18L, 19L, 0L, 0L, 1L, 99L));
641+
assertThat("start new range as the buffer has empty slots", shardChangesRequests, hasSize(6));
642+
assertThat(shardChangesRequests.get(5)[0], equalTo(20L));
643+
assertThat(shardChangesRequests.get(5)[1], equalTo(10L));
644+
645+
task.innerHandleReadResponse(9L, 9L, generateShardChangesResponse(9L, 9L, 0L, 0L, 1L, 99L));
646+
assertThat("do not start new range as the buffer is full", shardChangesRequests, hasSize(6));
647+
pendingBulkShardRequests.remove().onResponse(new BulkShardOperationsResponse());
648+
assertThat("start new range as the buffer is empty after sending", shardChangesRequests, hasSize(7));
649+
assertThat(shardChangesRequests.get(6)[0], equalTo(30L));
650+
assertThat(shardChangesRequests.get(6)[1], equalTo(10L));
651+
}
652+
602653
public void testMappingUpdate() {
603654
ShardFollowTaskParams params = new ShardFollowTaskParams();
604655
params.maxReadRequestOperationCount = 64;
@@ -996,7 +1047,7 @@ public void testMaxWriteRequestSize() {
9961047

9971048
ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 64L);
9981049
// Also invokes coordinatesWrites()
999-
task.innerHandleReadResponse(0L, 64L, response);
1050+
task.innerHandleReadResponse(0L, 63L, response);
10001051

10011052
assertThat(bulkShardOperationRequests.size(), equalTo(64));
10021053
}
@@ -1122,6 +1173,7 @@ private ShardFollowNodeTask createShardFollowTask(ShardFollowTaskParams params)
11221173
followerGlobalCheckpoints = new LinkedList<>();
11231174
maxSeqNos = new LinkedList<>();
11241175
responseSizes = new LinkedList<>();
1176+
pendingBulkShardRequests = new LinkedList<>();
11251177
return new ShardFollowNodeTask(
11261178
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) {
11271179

@@ -1185,6 +1237,8 @@ protected void innerSendBulkShardOperationsRequest(
11851237
response.setGlobalCheckpoint(followerGlobalCheckpoint);
11861238
response.setMaxSeqNo(followerGlobalCheckpoint);
11871239
handler.accept(response);
1240+
} else {
1241+
pendingBulkShardRequests.add(ActionListener.wrap(handler::accept, errorHandler));
11881242
}
11891243
}
11901244

0 commit comments

Comments
 (0)