Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,21 @@ synchronized void coordinateReads() {
params.getFollowShardId(), lastRequestedSeqno, leaderGlobalCheckpoint);
final int maxBatchOperationCount = params.getMaxBatchOperationCount();
while (hasReadBudget() && lastRequestedSeqno < leaderGlobalCheckpoint) {
final long from = lastRequestedSeqno + 1;
final long maxRequiredSeqNo = Math.min(leaderGlobalCheckpoint, from + maxBatchOperationCount - 1);
final int requestBatchCount;
if (numConcurrentReads == 0) {
// If this is the only request, we can treat it as a peek read.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add "add let it optimistically fetch more documents if possible (but not require it)"?

requestBatchCount = maxBatchOperationCount;
} else {
requestBatchCount = Math.toIntExact(maxRequiredSeqNo - from + 1);
}
assert 0 < requestBatchCount && requestBatchCount <= maxBatchOperationCount : "request_batch_count=" + requestBatchCount;
LOGGER.trace("{}[{} ongoing reads] read from_seqno={} max_required_seqno={} batch_count={}",
params.getFollowShardId(), numConcurrentReads, from, maxRequiredSeqNo, requestBatchCount);
numConcurrentReads++;
long from = lastRequestedSeqno + 1;
// -1 is needed, because maxRequiredSeqno is inclusive
long maxRequiredSeqno = Math.min(leaderGlobalCheckpoint, (from + maxBatchOperationCount) - 1);
LOGGER.trace("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, maxRequiredSeqno, maxBatchOperationCount);
sendShardChangesRequest(from, maxBatchOperationCount, maxRequiredSeqno);
lastRequestedSeqno = maxRequiredSeqno;
sendShardChangesRequest(from, requestBatchCount, maxRequiredSeqNo);
lastRequestedSeqno = maxRequiredSeqNo;
}

if (numConcurrentReads == 0 && hasReadBudget()) {
Expand Down Expand Up @@ -186,7 +194,13 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res
maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
}

/** Called when some operations are fetched from the leading */
protected void onOperationsFetched(Translog.Operation[] operations) {

}

synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
onOperationsFetched(response.getOperations());
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
final long newFromSeqNo;
if (response.getOperations().length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {

public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureYellow("index1");
Expand Down Expand Up @@ -218,7 +218,7 @@ public void testFollowIndex() throws Exception {
}

public void testSyncMappings() throws Exception {
final String leaderIndexSettings = getIndexSettings(2,
final String leaderIndexSettings = getIndexSettings(2, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureYellow("index1");
Expand Down Expand Up @@ -255,7 +255,8 @@ public void testSyncMappings() throws Exception {
}

public void testFollowIndex_backlog() throws Exception {
String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
String leaderIndexSettings = getIndexSettings(between(1, 5), between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
Expand Down Expand Up @@ -306,10 +307,10 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)

public void testFollowIndexAndCloseNode() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));

String followerIndexSettings = getIndexSettings(3, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
String followerIndexSettings = getIndexSettings(3, 1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
ensureGreen("index1", "index2");

Expand Down Expand Up @@ -366,13 +367,14 @@ public void testFollowIndexAndCloseNode() throws Exception {

public void testFollowIndexWithNestedField() throws Exception {
final String leaderIndexSettings =
getIndexSettingsWithNestedMapping(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));

final String followerIndexSettings =
getIndexSettingsWithNestedMapping(1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));

internalCluster().ensureAtLeastNumDataNodes(2);
ensureGreen("index1", "index2");

final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
Expand Down Expand Up @@ -455,7 +457,8 @@ public void testValidateFollowingIndexSettings() throws Exception {
}

public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureYellow("index1");

Expand Down Expand Up @@ -554,15 +557,16 @@ private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int valu
};
}

private String getIndexSettings(final int numberOfPrimaryShards, final Map<String, String> additionalIndexSettings) throws IOException {
private String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException {
final String settings;
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("settings");
{
builder.field("index.number_of_shards", numberOfPrimaryShards);
builder.field("index.number_of_replicas", 1);
builder.field("index.number_of_shards", numberOfShards);
builder.field("index.number_of_replicas", numberOfReplicas);
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
}
Expand Down Expand Up @@ -592,15 +596,16 @@ private String getIndexSettings(final int numberOfPrimaryShards, final Map<Strin
return settings;
}

private String getIndexSettingsWithNestedMapping(final int numberOfPrimaryShards,
private String getIndexSettingsWithNestedMapping(final int numberOfShards, final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException {
final String settings;
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("settings");
{
builder.field("index.number_of_shards", numberOfPrimaryShards);
builder.field("index.number_of_shards", numberOfShards);
builder.field("index.number_of_replicas", numberOfReplicas);
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testSingleReaderWriter() throws Exception {

public void testMultipleReaderWriter() throws Exception {
int concurrency = randomIntBetween(2, 8);
TestRun testRun = createTestRun(0, 0, 1024);
TestRun testRun = createTestRun(0, 0, between(1, 1024));
ShardFollowNodeTask task = createShardFollowTask(concurrency, testRun);
startAndAssertAndStopTask(task, testRun);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
private Queue<Long> followerGlobalCheckpoints;

public void testCoordinateReads() {
ShardFollowNodeTask task = createShardFollowTask(8, 8, 8, Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 64, -1);

ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE);
startTask(task, 3, -1);
task.coordinateReads();
assertThat(shardChangesRequests.size(), equalTo(8));
assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request
shardChangesRequests.clear();
task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 60L));
assertThat(shardChangesRequests, contains(new long[][]{
{0L, 8L}, {8L, 8L}, {16L, 8L}, {24L, 8L}, {32L, 8L}, {40L, 8L}, {48L, 8L}, {56L, 8L}}
{6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}}
));

ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(8));
assertThat(status.getLastRequestedSeqno(), equalTo(63L));
assertThat(status.getNumberOfConcurrentReads(), equalTo(7));
assertThat(status.getLastRequestedSeqno(), equalTo(60L));
}

public void testWriteBuffer() {
Expand Down Expand Up @@ -263,12 +263,12 @@ public void testReceiveLessThanRequested() {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

shardChangesRequests.clear();
ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
task.innerHandleReadResponse(0L, 64L, response);
ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 31L);
task.innerHandleReadResponse(0L, 63L, response);

assertThat(shardChangesRequests.size(), equalTo(1));
assertThat(shardChangesRequests.get(0)[0], equalTo(32L));
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
assertThat(shardChangesRequests.get(0)[0], equalTo(21L));
assertThat(shardChangesRequests.get(0)[1], equalTo(43L));

ShardFollowNodeTask.Status status = task.getStatus();
assertThat(status.getNumberOfConcurrentReads(), equalTo(1));
Expand Down Expand Up @@ -310,7 +310,7 @@ public void testReceiveNothingExpectedSomething() {
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));

shardChangesRequests.clear();
task.innerHandleReadResponse(0L, 64L,
task.innerHandleReadResponse(0L, 63L,
new ShardChangesAction.Response(0, 0, new Translog.Operation[0]));

assertThat(shardChangesRequests.size(), equalTo(1));
Expand Down Expand Up @@ -675,9 +675,9 @@ protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> oper
}

@Override
protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
protected void innerSendShardChangesRequest(long from, int requestBatchSize, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler) {
shardChangesRequests.add(new long[]{from, maxBatchOperationCount});
shardChangesRequests.add(new long[]{from, requestBatchSize});
Exception readFailure = ShardFollowNodeTaskTests.this.readFailures.poll();
if (readFailure != null) {
errorHandler.accept(readFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ccr.action;

import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongSet;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
Expand Down Expand Up @@ -72,6 +74,7 @@ public void testSimpleCcrReplication() throws Exception {
followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size());
});
shardFollowTask.markAsCompleted();
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
}
}

Expand Down Expand Up @@ -107,6 +110,7 @@ public void testFailLeaderReplicaShard() throws Exception {
leaderGroup.assertAllEqual(docCount);
assertBusy(() -> followerGroup.assertAllEqual(docCount));
shardFollowTask.markAsCompleted();
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
}
}

Expand Down Expand Up @@ -141,12 +145,23 @@ private ReplicationGroup createFollowGroup(int replicas) throws IOException {

private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) {
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0), 1024, 1, Long.MAX_VALUE, 1, 10240,
new ShardId("leader_index", "", 0), between(1, 64), between(1, 8), Long.MAX_VALUE, between(1, 4), 10240,
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap());

BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
AtomicBoolean stopped = new AtomicBoolean(false);
LongSet fetchOperations = new LongHashSet();
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
@Override
protected synchronized void onOperationsFetched(Translog.Operation[] operations) {
super.onOperationsFetched(operations);
for (Translog.Operation operation : operations) {
if (fetchOperations.add(operation.seqNo()) == false) {
throw new AssertionError("Operation [" + operation + " ] was fetched already");
}
}
}

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
// noop, as mapping updates are not tested
Expand Down Expand Up @@ -210,6 +225,13 @@ public void markAsFailed(Exception e) {
};
}

private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup leader, ReplicationGroup follower) throws IOException {
int totalOps = leader.getPrimary().estimateNumberOfHistoryOperations("test", 0);
for (IndexShard followingShard : follower) {
assertThat(followingShard.estimateNumberOfHistoryOperations("test", 0), equalTo(totalOps));
}
}

class CCRAction extends ReplicationAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {

CCRAction(BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener, ReplicationGroup group) {
Expand Down