Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,21 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();

final Translog.Operation[] operations = getOperations(
indexShard,
seqNoStats.getGlobalCheckpoint(),
request.getFromSeqNo(),
request.getMaxOperationCount(),
request.getExpectedHistoryUUID(),
request.getMaxBatchSize());
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
// must capture after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
// must capture IndexMetaData after snapshotting operations to ensure the returned mapping version is at least as up-to-date
// as the mapping version that these operations used. Here we must not use IndexMetaData from ClusterService for we expose
// a new cluster state to ClusterApplier(s) before exposing it in the ClusterService.
final IndexMetaData indexMetaData = indexService.getMetaData();
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();
return getResponse(
mappingVersion,
settingsVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void start(
}

// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
updateMapping(followerMappingVersion -> {
updateMapping(0L, followerMappingVersion -> {
synchronized (ShardFollowNodeTask.this) {
currentMappingVersion = followerMappingVersion;
}
Expand Down Expand Up @@ -378,7 +378,7 @@ private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion,
} else {
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
updateMapping(mappingVersion -> {
updateMapping(minimumRequiredMappingVersion, mappingVersion -> {
currentMappingVersion = mappingVersion;
task.run();
});
Expand All @@ -400,12 +400,13 @@ private synchronized void maybeUpdateSettings(final Long minimumRequiredSettings
}
}

private void updateMapping(LongConsumer handler) {
updateMapping(handler, new AtomicInteger(0));
private void updateMapping(long minRequiredMappingVersion, LongConsumer handler) {
updateMapping(minRequiredMappingVersion, handler, new AtomicInteger(0));
}

private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter)));
private void updateMapping(long minRequiredMappingVersion, LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(minRequiredMappingVersion, handler,
e -> handleFailure(e, retryCounter, () -> updateMapping(minRequiredMappingVersion, handler, retryCounter)));
}

private void updateSettings(final LongConsumer handler) {
Expand Down Expand Up @@ -468,7 +469,7 @@ static boolean shouldRetry(String remoteCluster, Exception e) {
}

// These methods are protected for testing purposes:
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
protected abstract void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler);

protected abstract void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,18 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
scheduler, System::nanoTime) {

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
Index leaderIndex = params.getLeaderShardId().getIndex();
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
// the returned mapping is outdated - retry again
if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we had the metadata version (which is also updated whenever index metadata / mapping changes), we could just do a waitForMetaDataVersion? This would avoid a possible busyloop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this retry in 7.0 after backporting to 6.x

innerUpdateMapping(minRequiredMappingVersion, handler, errorHandler);
return;
}
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
handler.accept(indexMetaData.getMappingVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,42 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
import org.hamcrest.Matchers;

import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class FollowerFailOverIT extends CcrIntegTestCase {

Expand Down Expand Up @@ -220,4 +230,54 @@ public void testAddNewReplicasOnFollower() throws Exception {
pauseFollow("follower-index");
}

public void testReadRequestsReturnsLatestMappingVersion() throws Exception {
InternalTestCluster leaderCluster = getLeaderCluster();
Settings nodeAttributes = Settings.builder().put("node.attr.box", "large").build();
String dataNode = leaderCluster.startDataOnlyNode(nodeAttributes);
assertAcked(
leaderClient().admin().indices().prepareCreate("leader-index")
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")
.put("index.routing.allocation.require.box", "large"))
.get()
);
ClusterService clusterService = leaderCluster.clusterService(dataNode);
ShardId shardId = clusterService.state().routingTable().index("leader-index").shard(0).shardId();
IndicesService indicesService = leaderCluster.getInstance(IndicesService.class, dataNode);
IndexShard indexShard = indicesService.getShardOrNull(shardId);
final CountDownLatch latch = new CountDownLatch(1);
clusterService.addLowPriorityApplier(event -> {
IndexMetaData imd = event.state().metaData().index("leader-index");
if (imd != null && imd.mapping() != null &&
XContentMapValues.extractValue("properties.balance.type", imd.mapping().sourceAsMap()) != null) {
try {
logger.info("--> block ClusterService from exposing new mapping version");
latch.await();
} catch (Exception e) {
throw new AssertionError(e);
}
}
});
leaderCluster.client().admin().indices().preparePutMapping().setType("doc")
.setSource("balance", "type=long").setTimeout(TimeValue.ZERO).get();
IndexResponse indexResp = leaderCluster.client(dataNode).prepareIndex("leader-index", "doc", "1")
.setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get();
assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L));
getFollowerCluster().startDataOnlyNode(nodeAttributes);
followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get();
ensureFollowerGreen("follower-index");
assertBusy(() -> {
CcrClient ccrClient = new CcrClient(followerClient());
FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet();
long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum();
assertThat(bytesRead, Matchers.greaterThan(0L));
});
latch.countDown();
assertIndexFullyReplicatedToFollower("leader-index", "follower-index");
pauseFollow("follower-index");
ensureNoCcrTasks();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testR
private final Map<Long, Integer> fromToSlot = new HashMap<>();

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
handler.accept(mappingVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ private ShardFollowNodeTask createShardFollowTask(ShardFollowTaskParams params)
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) {

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
Exception failure = mappingUpdateFailures.poll();
if (failure != null) {
errorHandler.accept(failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ protected synchronized void onOperationsFetched(Translog.Operation[] operations)
}

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
// noop, as mapping updates are not tested
handler.accept(1L);
}
Expand Down