Skip to content

Commit 4c26292

Browse files
committed
return metadata_version
1 parent ba0df59 commit 4c26292

File tree

6 files changed

+53
-6
lines changed

6 files changed

+53
-6
lines changed

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
121121
private final List<IndexEventListener> buildInIndexListener;
122122
private final PrimaryReplicaSyncer primaryReplicaSyncer;
123123
private final Consumer<ShardId> globalCheckpointSyncer;
124+
private volatile ClusterState clusterState; // the latest applied cluster state
124125

125126
@Inject
126127
public IndicesClusterStateService(
@@ -229,6 +230,7 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {
229230
return;
230231
}
231232

233+
this.clusterState = state;
232234
updateFailedShardsCache(state);
233235

234236
deleteIndices(event); // also deletes shards of deleted indices
@@ -246,6 +248,13 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {
246248
createOrUpdateShards(state);
247249
}
248250

251+
/**
252+
* Returns the latest applied cluster state
253+
*/
254+
public ClusterState getClusterState() {
255+
return clusterState;
256+
}
257+
249258
/**
250259
* Removes shard entries from the failed shards cache that are no longer allocated to this node by the master.
251260
* Sends shard failures for shards that are marked as actively allocated to this node but don't actually exist on the node.

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.logging.log4j.message.ParameterizedMessage;
99
import org.elasticsearch.ElasticsearchException;
1010
import org.elasticsearch.ExceptionsHelper;
11+
import org.elasticsearch.Version;
1112
import org.elasticsearch.action.Action;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.ActionRequestValidationException;
@@ -18,6 +19,7 @@
1819
import org.elasticsearch.cluster.ClusterState;
1920
import org.elasticsearch.cluster.metadata.IndexMetaData;
2021
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.metadata.MetaData;
2123
import org.elasticsearch.cluster.routing.ShardsIterator;
2224
import org.elasticsearch.cluster.service.ClusterService;
2325
import org.elasticsearch.common.inject.Inject;
@@ -34,6 +36,7 @@
3436
import org.elasticsearch.index.shard.ShardId;
3537
import org.elasticsearch.index.translog.Translog;
3638
import org.elasticsearch.indices.IndicesService;
39+
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
3740
import org.elasticsearch.tasks.Task;
3841
import org.elasticsearch.threadpool.ThreadPool;
3942
import org.elasticsearch.transport.TransportService;
@@ -212,6 +215,12 @@ public long getSettingsVersion() {
212215
return settingsVersion;
213216
}
214217

218+
private long metadataVersion;
219+
220+
public long getMetadataVersion() {
221+
return metadataVersion;
222+
}
223+
215224
private long globalCheckpoint;
216225

217226
public long getGlobalCheckpoint() {
@@ -248,6 +257,7 @@ public long getTookInMillis() {
248257
Response(
249258
final long mappingVersion,
250259
final long settingsVersion,
260+
final long metadataVersion,
251261
final long globalCheckpoint,
252262
final long maxSeqNo,
253263
final long maxSeqNoOfUpdatesOrDeletes,
@@ -256,6 +266,7 @@ public long getTookInMillis() {
256266

257267
this.mappingVersion = mappingVersion;
258268
this.settingsVersion = settingsVersion;
269+
this.metadataVersion = metadataVersion;
259270
this.globalCheckpoint = globalCheckpoint;
260271
this.maxSeqNo = maxSeqNo;
261272
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
@@ -268,6 +279,11 @@ public void readFrom(final StreamInput in) throws IOException {
268279
super.readFrom(in);
269280
mappingVersion = in.readVLong();
270281
settingsVersion = in.readVLong();
282+
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
283+
metadataVersion = in.readVLong();
284+
} else {
285+
metadataVersion = 0L;
286+
}
271287
globalCheckpoint = in.readZLong();
272288
maxSeqNo = in.readZLong();
273289
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
@@ -280,6 +296,9 @@ public void writeTo(final StreamOutput out) throws IOException {
280296
super.writeTo(out);
281297
out.writeVLong(mappingVersion);
282298
out.writeVLong(settingsVersion);
299+
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
300+
out.writeVLong(metadataVersion);
301+
}
283302
out.writeZLong(globalCheckpoint);
284303
out.writeZLong(maxSeqNo);
285304
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
@@ -294,6 +313,7 @@ public boolean equals(final Object o) {
294313
final Response that = (Response) o;
295314
return mappingVersion == that.mappingVersion &&
296315
settingsVersion == that.settingsVersion &&
316+
metadataVersion == that.metadataVersion &&
297317
globalCheckpoint == that.globalCheckpoint &&
298318
maxSeqNo == that.maxSeqNo &&
299319
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
@@ -306,6 +326,7 @@ public int hashCode() {
306326
return Objects.hash(
307327
mappingVersion,
308328
settingsVersion,
329+
metadataVersion,
309330
globalCheckpoint,
310331
maxSeqNo,
311332
maxSeqNoOfUpdatesOrDeletes,
@@ -317,17 +338,20 @@ public int hashCode() {
317338
public static class TransportAction extends TransportSingleShardAction<Request, Response> {
318339

319340
private final IndicesService indicesService;
341+
private final IndicesClusterStateService indicesClusterStateService;
320342

321343
@Inject
322344
public TransportAction(ThreadPool threadPool,
323345
ClusterService clusterService,
324346
TransportService transportService,
325347
ActionFilters actionFilters,
326348
IndexNameExpressionResolver indexNameExpressionResolver,
327-
IndicesService indicesService) {
349+
IndicesService indicesService,
350+
IndicesClusterStateService indicesClusterStateService) {
328351
super(NAME, threadPool, clusterService, transportService, actionFilters,
329352
indexNameExpressionResolver, Request::new, ThreadPool.Names.SEARCH);
330353
this.indicesService = indicesService;
354+
this.indicesClusterStateService = indicesClusterStateService;
331355
}
332356

333357
@Override
@@ -347,12 +371,14 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
347371
// must capture IndexMetaData after snapshotting operations to ensure the returned mapping version is at least as up-to-date
348372
// as the mapping version that these operations used. Here we must not use IndexMetaData from ClusterService for we expose
349373
// a new cluster state to ClusterApplier(s) before exposing it in the ClusterService.
350-
final IndexMetaData indexMetaData = indexService.getMetaData();
374+
final MetaData metaData = indicesClusterStateService.getClusterState().metaData();
375+
final IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex());
351376
final long mappingVersion = indexMetaData.getMappingVersion();
352377
final long settingsVersion = indexMetaData.getSettingsVersion();
353378
return getResponse(
354379
mappingVersion,
355380
settingsVersion,
381+
metaData.version(),
356382
seqNoStats,
357383
maxSeqNoOfUpdatesOrDeletes,
358384
operations,
@@ -432,7 +458,8 @@ private void globalCheckpointAdvancementFailure(
432458
e);
433459
if (e instanceof TimeoutException) {
434460
try {
435-
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
461+
final MetaData metaData = indicesClusterStateService.getClusterState().metaData();
462+
final IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex());
436463
final long mappingVersion = indexMetaData.getMappingVersion();
437464
final long settingsVersion = indexMetaData.getSettingsVersion();
438465
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
@@ -441,6 +468,7 @@ private void globalCheckpointAdvancementFailure(
441468
getResponse(
442469
mappingVersion,
443470
settingsVersion,
471+
metaData.version(),
444472
latestSeqNoStats,
445473
maxSeqNoOfUpdatesOrDeletes,
446474
EMPTY_OPERATIONS_ARRAY,
@@ -532,6 +560,7 @@ static Translog.Operation[] getOperations(
532560
static Response getResponse(
533561
final long mappingVersion,
534562
final long settingsVersion,
563+
final long metadataVersion,
535564
final SeqNoStats seqNoStats,
536565
final long maxSeqNoOfUpdates,
537566
final Translog.Operation[] operations,
@@ -541,6 +570,7 @@ static Response getResponse(
541570
return new Response(
542571
mappingVersion,
543572
settingsVersion,
573+
metadataVersion,
544574
seqNoStats.getGlobalCheckpoint(),
545575
seqNoStats.getMaxSeqNo(),
546576
maxSeqNoOfUpdates,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
1414
protected ShardChangesAction.Response createTestInstance() {
1515
final long mappingVersion = randomNonNegativeLong();
1616
final long settingsVersion = randomNonNegativeLong();
17+
final long metadataVersion = randomNonNegativeLong();
1718
final long leaderGlobalCheckpoint = randomNonNegativeLong();
1819
final long leaderMaxSeqNo = randomLongBetween(leaderGlobalCheckpoint, Long.MAX_VALUE);
1920
final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(-1, Long.MAX_VALUE);
@@ -25,6 +26,7 @@ protected ShardChangesAction.Response createTestInstance() {
2526
return new ShardChangesAction.Response(
2627
mappingVersion,
2728
settingsVersion,
29+
metadataVersion,
2830
leaderGlobalCheckpoint,
2931
leaderMaxSeqNo,
3032
maxSeqNoOfUpdatesOrDeletes,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
170170
assert from >= testRun.finalExpectedGlobalCheckpoint;
171171
final long globalCheckpoint = tracker.getCheckpoint();
172172
final long maxSeqNo = tracker.getMaxSeqNo();
173-
handler.accept(new ShardChangesAction.Response(0L, 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(),
173+
handler.accept(new ShardChangesAction.Response(0L, 0L, 1L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(),
174174
new Translog.Operation[0], 1L));
175175
}
176176
};
@@ -249,6 +249,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
249249
new ShardChangesAction.Response(
250250
mappingVersion,
251251
settingsVersion,
252+
1L,
252253
nextGlobalCheckPoint,
253254
nextGlobalCheckPoint,
254255
randomNonNegativeLong(),
@@ -274,6 +275,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
274275
ShardChangesAction.Response response = new ShardChangesAction.Response(
275276
mappingVersion,
276277
settingsVersion,
278+
1L,
277279
prevGlobalCheckpoint,
278280
prevGlobalCheckpoint,
279281
randomNonNegativeLong(),
@@ -293,6 +295,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion,
293295
ShardChangesAction.Response response = new ShardChangesAction.Response(
294296
mappingVersion,
295297
settingsVersion,
298+
1L,
296299
localLeaderGCP,
297300
localLeaderGCP,
298301
randomNonNegativeLong(),

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ public void testReceiveNothingExpectedSomething() {
526526
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
527527

528528
shardChangesRequests.clear();
529-
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 100, new Translog.Operation[0], 1L));
529+
task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 1L, 0, 0, 100, new Translog.Operation[0], 1L));
530530

531531
assertThat(shardChangesRequests.size(), equalTo(1));
532532
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
@@ -1022,6 +1022,7 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con
10221022
final ShardChangesAction.Response response = new ShardChangesAction.Response(
10231023
mappingVersions.poll(),
10241024
0L,
1025+
1L,
10251026
leaderGlobalCheckpoints.poll(),
10261027
maxSeqNos.poll(),
10271028
randomNonNegativeLong(),
@@ -1058,6 +1059,7 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro
10581059
return new ShardChangesAction.Response(
10591060
mappingVersion,
10601061
settingsVersion,
1062+
1L,
10611063
leaderGlobalCheckPoint,
10621064
leaderGlobalCheckPoint,
10631065
randomNonNegativeLong(),

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,14 +437,15 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
437437
final SeqNoStats seqNoStats = indexShard.seqNoStats();
438438
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
439439
if (from > seqNoStats.getGlobalCheckpoint()) {
440-
handler.accept(ShardChangesAction.getResponse(1L, 1L, seqNoStats,
440+
handler.accept(ShardChangesAction.getResponse(1L, 1L, 1L, seqNoStats,
441441
maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L));
442442
return;
443443
}
444444
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
445445
maxOperationCount, recordedLeaderIndexHistoryUUID, params.getMaxReadRequestSize());
446446
// hard code mapping version; this is ok, as mapping updates are not tested here
447447
final ShardChangesAction.Response response = new ShardChangesAction.Response(
448+
1L,
448449
1L,
449450
1L,
450451
seqNoStats.getGlobalCheckpoint(),

0 commit comments

Comments
 (0)