diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index e868da5e82ac6..b406621e978da 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.OptionalLong; import java.util.Set; import java.util.function.Function; @@ -127,6 +128,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ final Map checkpoints; + /** + * A callback invoked when the global checkpoint is updated. For primary mode this occurs if the computed global checkpoint advances on + * the basis of state changes tracked here. For non-primary mode this occurs if the local knowledge of the global checkpoint advances + * due to an update from the primary. + */ + private final LongConsumer onGlobalCheckpointUpdated; + /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the * current global checkpoint. @@ -391,7 +399,8 @@ public ReplicationTracker( final ShardId shardId, final String allocationId, final IndexSettings indexSettings, - final long globalCheckpoint) { + final long globalCheckpoint, + final LongConsumer onGlobalCheckpointUpdated) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -400,6 +409,7 @@ public ReplicationTracker( this.appliedClusterStateVersion = -1L; this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas()); checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); + this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; @@ -456,7 +466,10 @@ public synchronized void updateGlobalCheckpointOnReplica(final long globalCheckp updateGlobalCheckpoint( shardAllocationId, globalCheckpoint, - current -> logger.trace("updating global checkpoint from [{}] to [{}] due to [{}]", current, globalCheckpoint, reason)); + current -> { + logger.trace("updated global checkpoint from [{}] to [{}] due to [{}]", current, globalCheckpoint, reason); + onGlobalCheckpointUpdated.accept(globalCheckpoint); + }); assert invariant(); } @@ -474,7 +487,7 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI allocationId, globalCheckpoint, current -> logger.trace( - "updating local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]", + "updated local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]", allocationId, current, globalCheckpoint)); @@ -485,8 +498,8 @@ private void updateGlobalCheckpoint(final String allocationId, final long global final CheckpointState cps = checkpoints.get(allocationId); assert !this.shardAllocationId.equals(allocationId) || cps != null; if (cps != null && globalCheckpoint > cps.globalCheckpoint) { - ifUpdated.accept(cps.globalCheckpoint); cps.globalCheckpoint = globalCheckpoint; + ifUpdated.accept(cps.globalCheckpoint); } } @@ -737,8 +750,9 @@ private synchronized void updateGlobalCheckpointOnPrimary() { assert computedGlobalCheckpoint >= globalCheckpoint : "new global checkpoint [" + computedGlobalCheckpoint + "] is lower than previous one [" + globalCheckpoint + "]"; if (globalCheckpoint != computedGlobalCheckpoint) { - logger.trace("global checkpoint updated to [{}]", computedGlobalCheckpoint); cps.globalCheckpoint = computedGlobalCheckpoint; + logger.trace("updated global checkpoint to [{}]", computedGlobalCheckpoint); + onGlobalCheckpointUpdated.accept(computedGlobalCheckpoint); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9131055bcd928..17b39274c0a3d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -297,8 +297,9 @@ public IndexShard( this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); - this.replicationTracker = new ReplicationTracker(shardId, shardRouting.allocationId().getId(), indexSettings, - SequenceNumbers.UNASSIGNED_SEQ_NO); + final String aId = shardRouting.allocationId().getId(); + this.replicationTracker = + new ReplicationTracker(shardId, aId, indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint -> {}); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index e001f82809b07..3948da9c1119c 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -47,7 +47,9 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.LongConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -60,7 +62,7 @@ import static org.hamcrest.Matchers.not; public class ReplicationTrackerTests extends ESTestCase { - + public void testEmptyShards() { final ReplicationTracker tracker = newTracker(AllocationId.newInitializing()); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -99,6 +101,11 @@ private static Set ids(Set allocationIds) { return allocationIds.stream().map(AllocationId::getId).collect(Collectors.toSet()); } + private void updateLocalCheckpoint(final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) { + tracker.updateLocalCheckpoint(allocationId, localCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(tracker.getGlobalCheckpoint())); + } + public void testGlobalCheckpointUpdate() { final long initialClusterStateVersion = randomNonNegativeLong(); Map allocations = new HashMap<>(); @@ -137,14 +144,14 @@ public void testGlobalCheckpointUpdate() { assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1)); initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1 + initializing.size())); - allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId))); + allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId))); assertThat(tracker.getGlobalCheckpoint(), equalTo(minLocalCheckpoint)); // increment checkpoints active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); - allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId))); + allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId))); final long minLocalCheckpointAfterUpdates = allocations.entrySet().stream().map(Map.Entry::getValue).min(Long::compareTo).orElse(UNASSIGNED_SEQ_NO); @@ -153,7 +160,7 @@ public void testGlobalCheckpointUpdate() { final AllocationId extraId = AllocationId.newInitializing(); // first check that adding it without the master blessing doesn't change anything. - tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + updateLocalCheckpoint(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); assertNull(tracker.checkpoints.get(extraId)); expectThrows(IllegalStateException.class, () -> tracker.initiateTracking(extraId.getId())); @@ -165,7 +172,7 @@ public void testGlobalCheckpointUpdate() { // now notify for the new id if (randomBoolean()) { - tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + updateLocalCheckpoint(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), randomInt((int) minLocalCheckpointAfterUpdates)); } else { markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); @@ -175,6 +182,64 @@ public void testGlobalCheckpointUpdate() { assertThat(tracker.getGlobalCheckpoint(), greaterThan(minLocalCheckpoint)); } + public void testUpdateGlobalCheckpointOnReplica() { + final AllocationId active = AllocationId.newInitializing(); + final ReplicationTracker tracker = newTracker(active); + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE - 1); + tracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + assertThat(updatedGlobalCheckpoint.get(), equalTo(globalCheckpoint)); + final long nonUpdate = randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint); + updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); + tracker.updateGlobalCheckpointOnReplica(nonUpdate, "test"); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); + final long update = randomLongBetween(globalCheckpoint, Long.MAX_VALUE); + tracker.updateGlobalCheckpointOnReplica(update, "test"); + assertThat(updatedGlobalCheckpoint.get(), equalTo(update)); + } + + public void testMarkAllocationIdAsInSync() throws BrokenBarrierException, InterruptedException { + final long initialClusterStateVersion = randomNonNegativeLong(); + Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1); + Set active = new HashSet<>(activeWithCheckpoints.keySet()); + Map initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1); + Set initializing = new HashSet<>(initializingWithCheckpoints.keySet()); + final AllocationId primaryId = active.iterator().next(); + final AllocationId replicaId = initializing.iterator().next(); + final ReplicationTracker tracker = newTracker(primaryId); + tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet()); + final long localCheckpoint = randomLongBetween(0, Long.MAX_VALUE - 1); + tracker.activatePrimaryMode(localCheckpoint); + tracker.initiateTracking(replicaId.getId()); + final CyclicBarrier barrier = new CyclicBarrier(2); + final Thread thread = new Thread(() -> { + try { + barrier.await(); + tracker.markAllocationIdAsInSync( + replicaId.getId(), + randomLongBetween(NO_OPS_PERFORMED, localCheckpoint - 1)); + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); + } + }); + thread.start(); + barrier.await(); + awaitBusy(tracker::pendingInSync); + final long updatedLocalCheckpoint = randomLongBetween(1 + localCheckpoint, Long.MAX_VALUE); + // there is a shard copy pending in sync, the global checkpoint can not advance + updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); + tracker.updateLocalCheckpoint(primaryId.getId(), updatedLocalCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); + // we are implicitly marking the pending in sync copy as in sync with the current global checkpoint, no advancement should occur + tracker.updateLocalCheckpoint(replicaId.getId(), localCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); + barrier.await(); + thread.join(); + // now we expect that the global checkpoint would advance + tracker.markAllocationIdAsInSync(replicaId.getId(), updatedLocalCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(updatedLocalCheckpoint)); + } + public void testMissingActiveIdsPreventAdvance() { final Map active = randomAllocationsWithLocalCheckpoints(2, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(0, 5); @@ -191,14 +256,16 @@ public void testMissingActiveIdsPreventAdvance() { .entrySet() .stream() .filter(e -> !e.getKey().equals(missingActiveID)) - .forEach(e -> tracker.updateLocalCheckpoint(e.getKey().getId(), e.getValue())); + .forEach(e -> updateLocalCheckpoint(tracker, e.getKey().getId(), e.getValue())); if (missingActiveID.equals(primaryId) == false) { assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); } // now update all knowledge of all shards - assigned.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + assigned.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); + assertThat(updatedGlobalCheckpoint.get(), not(equalTo(UNASSIGNED_SEQ_NO))); } public void testMissingInSyncIdsPreventAdvance() { @@ -213,13 +280,15 @@ public void testMissingInSyncIdsPreventAdvance() { randomSubsetOf(randomIntBetween(1, initializing.size() - 1), initializing.keySet()).forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); - active.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + active.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); assertThat(tracker.getGlobalCheckpoint(), equalTo(NO_OPS_PERFORMED)); + assertThat(updatedGlobalCheckpoint.get(), equalTo(NO_OPS_PERFORMED)); // update again - initializing.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + initializing.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); + assertThat(updatedGlobalCheckpoint.get(), not(equalTo(UNASSIGNED_SEQ_NO))); } public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { @@ -236,7 +305,7 @@ public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { List> allocations = Arrays.asList(active, initializing, nonApproved); Collections.shuffle(allocations, random()); - allocations.forEach(a -> a.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP))); + allocations.forEach(a -> a.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP))); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); } @@ -271,7 +340,7 @@ public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { initializing.forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); } if (randomBoolean()) { - allocations.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + allocations.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); } // now remove shards @@ -281,9 +350,9 @@ public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { ids(activeToStay.keySet()), routingTable(initializingToStay.keySet(), primaryId), emptySet()); - allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid.getId(), ckp + 10L)); + allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L)); } else { - allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid.getId(), ckp + 10L)); + allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L)); tracker.updateFromMaster( initialClusterStateVersion + 2, ids(activeToStay.keySet()), @@ -331,7 +400,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { final List elements = IntStream.rangeClosed(0, globalCheckpoint - 1).boxed().collect(Collectors.toList()); Randomness.shuffle(elements); for (int i = 0; i < elements.size(); i++) { - tracker.updateLocalCheckpoint(trackingAllocationId.getId(), elements.get(i)); + updateLocalCheckpoint(tracker, trackingAllocationId.getId(), elements.get(i)); assertFalse(complete.get()); assertFalse(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync); assertBusy(() -> assertTrue(tracker.pendingInSync.contains(trackingAllocationId.getId()))); @@ -339,7 +408,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { if (randomBoolean()) { // normal path, shard catches up - tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64)); + updateLocalCheckpoint(tracker, trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64)); // synchronize with the waiting thread to mark that it is complete barrier.await(); assertTrue(complete.get()); @@ -355,13 +424,16 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { assertFalse(tracker.pendingInSync.contains(trackingAllocationId.getId())); thread.join(); } + + private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO); private ReplicationTracker newTracker(final AllocationId allocationId) { return new ReplicationTracker( new ShardId("test", "_na_", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), - UNASSIGNED_SEQ_NO); + UNASSIGNED_SEQ_NO, + updatedGlobalCheckpoint::set); } public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException { @@ -488,10 +560,10 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { // the tracking allocation IDs should play no role in determining the global checkpoint final Map activeLocalCheckpoints = newActiveAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024))); - activeLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a.getId(), l)); + activeLocalCheckpoints.forEach((a, l) -> updateLocalCheckpoint(tracker, a.getId(), l)); final Map initializingLocalCheckpoints = newInitializingAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024))); - initializingLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a.getId(), l)); + initializingLocalCheckpoints.forEach((a, l) -> updateLocalCheckpoint(tracker, a.getId(), l)); assertTrue( activeLocalCheckpoints .entrySet() @@ -504,6 +576,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { .allMatch(e -> tracker.getTrackedLocalCheckpointForShard(e.getKey().getId()).getLocalCheckpoint() == e.getValue())); final long minimumActiveLocalCheckpoint = (long) activeLocalCheckpoints.values().stream().min(Integer::compareTo).get(); assertThat(tracker.getGlobalCheckpoint(), equalTo(minimumActiveLocalCheckpoint)); + assertThat(updatedGlobalCheckpoint.get(), equalTo(minimumActiveLocalCheckpoint)); final long minimumInitailizingLocalCheckpoint = (long) initializingLocalCheckpoints.values().stream().min(Integer::compareTo).get(); // now we are going to add a new allocation ID and bring it in sync which should move it to the in-sync allocation IDs @@ -635,10 +708,11 @@ public void testPrimaryContextHandoff() throws IOException { FakeClusterState clusterState = initialState(); final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId(); + final LongConsumer onUpdate = updatedGlobalCheckpoint -> {}; ReplicationTracker oldPrimary = - new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO); + new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); ReplicationTracker newPrimary = - new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO); + new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f26522245493f..2a84a8f424616 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -460,7 +460,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler, new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? - new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED) : + new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) : globalCheckpointSupplier, primaryTerm::get); return config; }