Skip to content

Commit

Permalink
[elasticsearch] Introduce PrimaryTermAndGeneration in core ES
Browse files Browse the repository at this point in the history
  • Loading branch information
arteam committed Nov 6, 2023
1 parent 3dd97ba commit 04e0712
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryTermAndGeneration;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -61,8 +62,7 @@ protected void unpromotableShardOperation(
ActionListener.run(responseListener, listener -> {
IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
shard.waitForPrimaryTermAndGeneration(
request.getPrimaryTerm(),
request.getSegmentGeneration(),
new PrimaryTermAndGeneration(request.getPrimaryTerm(), request.getSegmentGeneration()),
listener.map(l -> ActionResponse.Empty.INSTANCE)
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryTermAndGeneration;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -216,8 +217,7 @@ private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexSh
assert r.segmentGeneration() > -1L;
assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM;
indexShard.waitForPrimaryTermAndGeneration(
r.primaryTerm(),
r.segmentGeneration(),
new PrimaryTermAndGeneration(r.primaryTerm(), r.segmentGeneration()),
listener.delegateFailureAndWrap((ll, aLong) -> super.asyncShardOperation(request, shardId, ll))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryTermAndGeneration;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -206,8 +207,7 @@ private void handleMultiGetOnUnpromotableShard(
assert r.segmentGeneration() > -1L;
assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM;
indexShard.waitForPrimaryTermAndGeneration(
r.primaryTerm(),
r.segmentGeneration(),
new PrimaryTermAndGeneration(r.primaryTerm(), r.segmentGeneration()),
listener.delegateFailureAndWrap(
(ll, aLong) -> getExecutor(request, shardId).execute(
ActionRunnable.supply(ll, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId))
Expand Down
16 changes: 5 additions & 11 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DenseVectorStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.PrimaryTermAndGeneration;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -2113,21 +2114,14 @@ public final EngineConfig getEngineConfig() {
return engineConfig;
}

/**
* Allows registering a listener for when the index shard is on a segment generation >= minGeneration.
*
* @deprecated use {@link #addPrimaryTermAndGenerationListener(long, long, ActionListener)} instead.
*/
@Deprecated
public void addSegmentGenerationListener(long minGeneration, ActionListener<Long> listener) {
addPrimaryTermAndGenerationListener(UNKNOWN_PRIMARY_TERM, minGeneration, listener);
}

/**
* Allows registering a listener for when the index shard is on a primary term >= minPrimaryTerm
* and a segment generation >= minGeneration.
*/
public void addPrimaryTermAndGenerationListener(long minPrimaryTerm, long minGeneration, ActionListener<Long> listener) {
public void addPrimaryTermAndGenerationListener(
PrimaryTermAndGeneration primaryTermAndGeneration,
ActionListener<PrimaryTermAndGeneration> listener
) {
throw new UnsupportedOperationException();
}

Expand Down
15 changes: 5 additions & 10 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4173,18 +4173,13 @@ public String toString() {
return "IndexShard(shardRouting=" + shardRouting + ")";
}

/**
* @deprecated use {@link #waitForPrimaryTermAndGeneration(long, long, ActionListener)} instead.
*/
@Deprecated
public void waitForSegmentGeneration(long segmentGeneration, ActionListener<Long> listener) {
waitForPrimaryTermAndGeneration(getOperationPrimaryTerm(), segmentGeneration, listener);
}

/**
* Registers a listener for an event when the shard advances to the provided primary term and segment generation
*/
public void waitForPrimaryTermAndGeneration(long primaryTerm, long segmentGeneration, ActionListener<Long> listener) {
getEngine().addPrimaryTermAndGenerationListener(primaryTerm, segmentGeneration, listener);
public void waitForPrimaryTermAndGeneration(
PrimaryTermAndGeneration primaryTermAndGeneration,
ActionListener<PrimaryTermAndGeneration> listener
) {
getEngine().addPrimaryTermAndGenerationListener(primaryTermAndGeneration, listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.index.shard;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Comparator;

public record PrimaryTermAndGeneration(long primaryTerm, long generation) implements Writeable, Comparable<PrimaryTermAndGeneration> {

private static final Comparator<PrimaryTermAndGeneration> COMPARATOR = Comparator.comparing(PrimaryTermAndGeneration::primaryTerm)
.thenComparing(PrimaryTermAndGeneration::generation);

public static final PrimaryTermAndGeneration ZERO = new PrimaryTermAndGeneration(0, 0);

public PrimaryTermAndGeneration(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(primaryTerm);
out.writeVLong(generation);
}

@Override
public String toString() {
return "[primary term=" + primaryTerm + ", generation=" + generation + ']';
}

@Override
public int compareTo(PrimaryTermAndGeneration other) {
return COMPARATOR.compare(this, other);
}
}

0 comments on commit 04e0712

Please sign in to comment.