-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Introduce primary context #25122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce primary context #25122
Changes from 14 commits
d687f51
8d871cf
a197a64
1e4255c
c1588bb
0b47386
73bc0d1
76162e4
08bb6fa
a799e69
58adfa0
03b863b
40a3fcc
8504b82
5f05d92
a35e497
b54a8d6
48157cd
9a58ff4
62966c5
da4c9aa
a4dda93
ef1d345
51ad65b
e3f1886
4ba8d5c
48572a6
c9bd0d7
acca222
d459f16
4471dc2
56f3c17
f97fd92
29ca82a
d7a8021
9683757
d9dda28
5422f6e
d04f14a
99597d1
52af334
e1de296
643fa8a
4e1fa9c
a275167
00e4083
5f16884
3a53fd3
ccde798
e6bbe8b
ff54eec
6ea61ef
f6f6acb
de862cd
dc377fa
077b3f4
4e05714
b30dbcc
744b03a
3c2731f
e19c8b3
0d33a88
7bd3c1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,10 +23,12 @@ | |
| import com.carrotsearch.hppc.ObjectLongMap; | ||
| import com.carrotsearch.hppc.cursors.ObjectLongCursor; | ||
| import org.elasticsearch.common.SuppressForbidden; | ||
| import org.elasticsearch.common.collect.HppcMaps; | ||
| import org.elasticsearch.index.IndexSettings; | ||
| import org.elasticsearch.index.shard.AbstractIndexShardComponent; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
|
|
||
| import java.util.Arrays; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As does this
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
| import java.util.HashSet; | ||
| import java.util.Locale; | ||
| import java.util.Set; | ||
|
|
@@ -248,6 +250,79 @@ public synchronized void updateAllocationIdsFromMaster( | |
| updateGlobalCheckpointOnPrimary(); | ||
| } | ||
|
|
||
| /** | ||
| * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. | ||
| * | ||
| * @param seqNoPrimaryContext the sequence number context | ||
| */ | ||
| synchronized void updateAllocationIdsFromPrimaryContext(final SeqNoPrimaryContext seqNoPrimaryContext) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is tricky. The master is the one that drives the set of shards that are allocated. If the copy was removed by the master we shouldn't re-add it because of a primary handoff that happens concurrently. I think we should make the primary context be a recovery level thing that uses existing shard API (updateLocalCheckPoint/markAllocationIdAsInSync) |
||
| /* | ||
| * We are gathered here today to witness the relocation handoff transferring knowledge from the relocation source to the relocation | ||
| * target. We need to consider the possibility that the version of the cluster state on the relocation source when the primary | ||
| * context was sampled is different than the version of the cluster state on the relocation target at this exact moment. We define | ||
| * the following values: | ||
| * - version(source) = the cluster state version on the relocation source used to ensure a minimum cluster state version on the | ||
| * relocation target | ||
| * - version(context) = the cluster state version on the relocation source when the primary context was sampled | ||
| * - version(target) = the current cluster state version on the relocation target | ||
| * | ||
| * We know that version(source) <= version(target) and version(context) < version(target), version(context) = version(target), and | ||
| * version(target) < version(context) are all possibilities. | ||
| * | ||
| * The case of version(context) = version(target) causes no issues as in this case the knowledge of the in-sync and initializing | ||
| * shards the target receives from the master will be equal to the knowledge of the in-sync and initializing shards the target | ||
| * receives from the relocation source via the primary context. | ||
| * | ||
| * In the case when version(context) < version(target) or version(target) < version(context), we first consider shards that could be | ||
| * contained in the primary context but not contained in the cluster state applied on the target. | ||
| * | ||
| * Suppose there is such a shard and that it is an in-sync shard. However, marking a shard as in-sync requires an operation permit | ||
| * on the primary shard. Such a permit can not be obtained after the relocation handoff has started as the relocation handoff blocks | ||
| * all operations. Therefore, there can not be such a shard that is marked in-sync. | ||
| * | ||
| * Now consider the case of an initializing shard that is contained in the primary context but not contained in the cluster state | ||
| * applied on the target. | ||
| * | ||
| * If version(context) < version(target) it means that the shard has been removed by a later cluster state update that is already | ||
| * applied on the target and we only need to ensure that we do not add it to the tracking map on the target. The call to | ||
| * GlobalCheckpointTracker#updateLocalCheckpoint(String, long) is a no-op for such shards and this is safe. | ||
| * | ||
| * If version(target) < version(context) it means that the shard has started initializing by a later cluster state update has not | ||
| * yet arrived on the target. However, there is a delay on recoveries before we ensure that version(source) <= version(target). | ||
| * Therefore, such a shard can never initialize from the relocation source and will have to await the handoff completing. As such, | ||
| * these shards are not problematic. | ||
| * | ||
| * Now we consider shards that are contained in the cluster state applied on the target but not contained in the primary context. | ||
| * | ||
| * If version(context) < version(target) it means that the target has learned of an initializing shard that the source is not aware | ||
| * of. As explained above, this initialization can only succeed after the relocation is complete, and only with the target as the | ||
| * source of the recovery. | ||
| * | ||
| * Otherwise, if version(target) < version(context) it only means that the global checkpoint on the target will be held back until a | ||
| * later cluster state update arrives because the target will not learn of the removal until later. | ||
| * | ||
| * In both cases, no calls to update the local checkpoint for such shards will be made. This case is safe too. | ||
| */ | ||
| for (final ObjectLongCursor<String> cursor : seqNoPrimaryContext.inSyncLocalCheckpoints()) { | ||
| updateLocalCheckpoint(cursor.key, cursor.value); | ||
| assert cursor.value >= globalCheckpoint | ||
|
||
| : "local checkpoint [" + cursor.value + "] violates being at least the global checkpoint [" + globalCheckpoint + "]"; | ||
| try { | ||
| markAllocationIdAsInSync(cursor.key, cursor.value); | ||
|
||
| } catch (final InterruptedException e) { | ||
| /* | ||
| * Since the local checkpoint already exceeds the global checkpoint here, we never blocking waiting for advancement. This | ||
| * means that we can never be interrupted. If we are bail, something is catastrophically wrong. | ||
| */ | ||
| throw new AssertionError(e); | ||
| } | ||
| } | ||
|
|
||
| for (final ObjectLongCursor<String> cursor : seqNoPrimaryContext.trackingLocalCheckpoints()) { | ||
| updateLocalCheckpoint(cursor.key, cursor.value); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint | ||
| * on the specified shard advances above the current global checkpoint. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| /* | ||
| * Licensed to Elasticsearch under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.elasticsearch.index.seqno; | ||
|
|
||
| import com.carrotsearch.hppc.ObjectLongHashMap; | ||
| import com.carrotsearch.hppc.ObjectLongMap; | ||
| import com.carrotsearch.hppc.cursors.ObjectLongCursor; | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| /** | ||
| * Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing | ||
| * shards and their local checkpoints. | ||
| */ | ||
| public class SeqNoPrimaryContext implements Writeable { | ||
|
||
|
|
||
| private ObjectLongMap<String> inSyncLocalCheckpoints; | ||
|
|
||
| public ObjectLongMap<String> inSyncLocalCheckpoints() { | ||
| return inSyncLocalCheckpoints; | ||
| } | ||
|
|
||
| private ObjectLongMap<String> trackingLocalCheckpoints; | ||
|
|
||
| public ObjectLongMap<String> trackingLocalCheckpoints() { | ||
| return trackingLocalCheckpoints; | ||
| } | ||
|
|
||
| public SeqNoPrimaryContext(final ObjectLongMap<String> inSyncLocalCheckpoints, final ObjectLongMap<String> trackingLocalCheckpoints) { | ||
| this.inSyncLocalCheckpoints = inSyncLocalCheckpoints; | ||
| this.trackingLocalCheckpoints = trackingLocalCheckpoints; | ||
| } | ||
|
|
||
| public SeqNoPrimaryContext(StreamInput in) throws IOException { | ||
| inSyncLocalCheckpoints = readMap(in); | ||
| trackingLocalCheckpoints = readMap(in); | ||
| } | ||
|
|
||
| private static ObjectLongMap<String> readMap(final StreamInput in) throws IOException { | ||
| final int length = in.readInt(); | ||
| final ObjectLongMap<String> map = new ObjectLongHashMap<>(length); | ||
| for (int i = 0; i < length; i++) { | ||
| final String key = in.readString(); | ||
| final long value = in.readZLong(); | ||
| map.addTo(key, value); | ||
| } | ||
| return map; | ||
| } | ||
|
|
||
| @Override | ||
| public void writeTo(final StreamOutput out) throws IOException { | ||
| writeMap(out, inSyncLocalCheckpoints); | ||
| writeMap(out, trackingLocalCheckpoints); | ||
| } | ||
|
|
||
| private static void writeMap(final StreamOutput out, final ObjectLongMap<String> map) throws IOException { | ||
| out.writeInt(map.size()); | ||
| for (ObjectLongCursor<String> cursor : map) { | ||
| out.writeString(cursor.key); | ||
| out.writeZLong(cursor.value); | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,8 @@ | |
|
|
||
| package org.elasticsearch.index.seqno; | ||
|
|
||
| import com.carrotsearch.hppc.ObjectLongHashMap; | ||
| import com.carrotsearch.hppc.ObjectLongMap; | ||
| import org.elasticsearch.index.IndexSettings; | ||
| import org.elasticsearch.index.shard.AbstractIndexShardComponent; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
|
|
@@ -174,6 +176,15 @@ public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, | |
| globalCheckpointTracker.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); | ||
| } | ||
|
|
||
| /** | ||
| * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. | ||
| * | ||
| * @param seqNoPrimaryContext the sequence number context | ||
| */ | ||
| public void updateAllocationIdsFromPrimaryContext(final SeqNoPrimaryContext seqNoPrimaryContext) { | ||
| globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(seqNoPrimaryContext); | ||
| } | ||
|
|
||
| /** | ||
| * Check if there are any recoveries pending in-sync. | ||
| * | ||
|
|
@@ -183,4 +194,18 @@ public boolean pendingInSync() { | |
| return globalCheckpointTracker.pendingInSync(); | ||
| } | ||
|
|
||
| /** | ||
| * Get the sequence number primary context for the shard. This includes the state of the global checkpoint tracker. | ||
| * | ||
| * @return the sequence number primary context | ||
| */ | ||
| public SeqNoPrimaryContext seqNoPrimaryContext() { | ||
| synchronized (globalCheckpointTracker) { | ||
|
||
| final ObjectLongMap<String> inSyncLocalCheckpoints = new ObjectLongHashMap<>(globalCheckpointTracker.inSyncLocalCheckpoints); | ||
| final ObjectLongMap<String> trackingLocalCheckpoints = | ||
| new ObjectLongHashMap<>(globalCheckpointTracker.trackingLocalCheckpoints); | ||
| return new SeqNoPrimaryContext(inSyncLocalCheckpoints, trackingLocalCheckpoints); | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,7 @@ | |
| import org.elasticsearch.common.util.BigArrays; | ||
| import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
| import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; | ||
| import org.elasticsearch.common.util.concurrent.CountDown; | ||
| import org.elasticsearch.index.Index; | ||
| import org.elasticsearch.index.IndexModule; | ||
| import org.elasticsearch.index.IndexNotFoundException; | ||
|
|
@@ -95,6 +96,7 @@ | |
| import org.elasticsearch.index.refresh.RefreshStats; | ||
| import org.elasticsearch.index.search.stats.SearchStats; | ||
| import org.elasticsearch.index.search.stats.ShardSearchStats; | ||
| import org.elasticsearch.index.seqno.SeqNoPrimaryContext; | ||
| import org.elasticsearch.index.seqno.SeqNoStats; | ||
| import org.elasticsearch.index.seqno.SequenceNumbersService; | ||
| import org.elasticsearch.index.similarity.SimilarityService; | ||
|
|
@@ -478,7 +480,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta | |
| } | ||
| } | ||
|
|
||
| public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { | ||
| public void relocated(final String reason, final Runnable onBlocked) throws IllegalIndexShardStateException, InterruptedException { | ||
|
||
| assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; | ||
| try { | ||
| indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { | ||
|
|
@@ -498,6 +500,7 @@ public void relocated(String reason) throws IllegalIndexShardStateException, Int | |
| throw new IllegalIndexShardStateException(shardId, IndexShardState.STARTED, | ||
| ": shard is no longer relocating " + shardRouting); | ||
| } | ||
| onBlocked.run(); | ||
|
||
| changeState(IndexShardState.RELOCATED, reason); | ||
| } | ||
| }); | ||
|
|
@@ -510,6 +513,17 @@ public void relocated(String reason) throws IllegalIndexShardStateException, Int | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Obtain the primary context for the shard. The shard must be serving as the relocation source for a primary shard. | ||
| * | ||
| * @return the primary for the shard | ||
| */ | ||
| public PrimaryContext primaryContext() { | ||
| verifyPrimary(); | ||
| assert shardRouting.relocating() : "primary context can only be obtained from a relocating primary but was " + shardRouting; | ||
| assert !shardRouting.isRelocationTarget() : "primary context can only be obtained from relocation source but was " + shardRouting; | ||
|
||
| return new PrimaryContext(getEngine().seqNoService().seqNoPrimaryContext()); | ||
| } | ||
|
|
||
| public IndexShardState state() { | ||
| return state; | ||
|
|
@@ -1249,16 +1263,16 @@ private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardSta | |
|
|
||
| private void verifyPrimary() { | ||
| if (shardRouting.primary() == false) { | ||
| throw new IllegalStateException("shard is not a primary " + shardRouting); | ||
| throw new IllegalStateException("shard " + shardRouting + " is not a primary"); | ||
| } | ||
| } | ||
|
|
||
| private void verifyReplicationTarget() { | ||
| final IndexShardState state = state(); | ||
| if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) { | ||
| // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException | ||
| throw new IllegalStateException("active primary shard cannot be a replication target before " + | ||
| " relocation hand off " + shardRouting + ", state is [" + state + "]"); | ||
| throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " + | ||
| "relocation hand off, state is [" + state + "]"); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1524,19 +1538,62 @@ public void waitForOpsToComplete(final long seqNo) throws InterruptedException { | |
| /** | ||
| * Marks the shard with the provided allocation ID as in-sync with the primary shard. See | ||
| * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} | ||
| * for additional details. | ||
| * for additional details. Because this operation could be completed asynchronously on another thread, the caller must provide a latch; | ||
| * this latch will be counted down after the operation completes. | ||
|
||
| * | ||
| * @param allocationId the allocation ID of the shard to mark as in-sync | ||
| * @param localCheckpoint the current local checkpoint on the shard | ||
| * @param allocationId the allocation ID of the shard to mark as in-sync | ||
| * @param localCheckpoint the current local checkpoint on the shard | ||
| * @param latch a latch that is counted down after the operation completes | ||
| * @param onFailureConsumer a callback if an exception occurs completing the operation | ||
| */ | ||
| public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { | ||
| verifyPrimary(); | ||
| getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); | ||
| public void markAllocationIdAsInSync( | ||
|
||
| final String allocationId, | ||
| final long localCheckpoint, | ||
| final CountDownLatch latch, | ||
| final Consumer<Exception> onFailureConsumer) { | ||
| /* | ||
| * We could have blocked waiting for the replica to catch up that we fell idle and there will not be a background sync to the | ||
| * replica; mark our self as active to force a future background sync. | ||
| * Before marking the shard as in-sync we acquire an operation permit. We do this so that there is a barrier between marking a shard | ||
| * as in-sync and relocating a shard. If we acquire the permit then no relocation handoff can complete before we are done marking | ||
| * the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire the permit | ||
| * then the state of the shard will be relocated and this recovery will fail. | ||
| */ | ||
| active.compareAndSet(false, true); | ||
| acquirePrimaryOperationPermit( | ||
| new ActionListener<Releasable>() { | ||
| @Override | ||
| public void onResponse(final Releasable releasable) { | ||
| // we could have been relocated while waiting for a permit | ||
| if (state() == IndexShardState.RELOCATED) { | ||
| onFailure(new IndexShardRelocatedException(shardId)); | ||
| } | ||
| boolean success = false; | ||
| try { | ||
| getEngine().seqNoService().markAllocationIdAsInSync(allocationId, localCheckpoint); | ||
| /* | ||
| * We could have blocked so long waiting for the replica to catch up that we fell idle and there will not be a | ||
|
||
| * background sync to the replica; mark our self as active to force a future background sync. | ||
| */ | ||
| active.compareAndSet(false, true); | ||
| success = true; | ||
| } catch (final Exception e) { | ||
| onFailure(e); | ||
| } finally { | ||
| releasable.close(); | ||
| if (success) { | ||
| latch.countDown(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(final Exception e) { | ||
| try { | ||
| onFailureConsumer.accept(e); | ||
| } finally { | ||
| latch.countDown(); | ||
| } | ||
| } | ||
| }, | ||
| ThreadPool.Names.GENERIC); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1599,6 +1656,20 @@ public void updateAllocationIdsFromMaster(final Set<String> activeAllocationIds, | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source. | ||
| * | ||
| * @param seqNoPrimaryContext the sequence number context | ||
| */ | ||
| public void updateAllocationIdsFromPrimaryContext(final SeqNoPrimaryContext seqNoPrimaryContext) { | ||
| verifyPrimary(); | ||
| assert shardRouting.isRelocationTarget(); | ||
|
||
| final Engine engine = getEngineOrNull(); | ||
| if (engine != null) { | ||
| engine.seqNoService().updateAllocationIdsFromPrimaryContext(seqNoPrimaryContext); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check if there are any recoveries pending in-sync. | ||
| * | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.