Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,32 @@
package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -34,8 +45,11 @@
*/
public class TransportGetAction extends TransportSingleShardAction<GetRequest, GetResponse> {

private static final Logger logger = LogManager.getLogger(TransportGetAction.class);

private final IndicesService indicesService;
private final ExecutorSelector executorSelector;
private final NodeClient client;

@Inject
public TransportGetAction(
Expand All @@ -45,7 +59,8 @@ public TransportGetAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ExecutorSelector executorSelector
ExecutorSelector executorSelector,
NodeClient client
) {
super(
GetAction.NAME,
Expand All @@ -59,6 +74,7 @@ public TransportGetAction(
);
this.indicesService = indicesService;
this.executorSelector = executorSelector;
this.client = client;
// register the internal TransportGetFromTranslogAction
new TransportGetFromTranslogAction(transportService, indicesService, actionFilters);
}
Expand All @@ -78,7 +94,10 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
request.request().routing(),
request.request().preference()
);
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(iterator);
if (iterator == null) {
return null;
}
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
}

@Override
Expand All @@ -91,6 +110,16 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this happen? Looks like we init it in IndexShard constructor and then only update it to non-null values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure. I vaguely remember running into an issue in the begging when I was working on this, and adding this helped. Maybe the issue was something else. I do see that similar checks are done in a couple of other transport actions, e.g., TransportIndicesStatsAction and DataStreamsStatsTransportAction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but that looks like both comes from TransportIndicesStatsAction, which was done in 2014 and thus on a very different code base. I think we should assume this does not happen, since we assume so in other actions, for instance TransportGetFromTranslogAction, PostWriteRefresh and many others. Unless you know where it can happen I suggest to remove the new null check, we do not want null-checks all over for things that cannot be null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree!

listener.onFailure(new ShardNotFoundException(shardId));
return;
}
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
handleGetOnUnpromotableShard(request, indexShard, listener);
return;
}
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false
: "A TransportGetAction should always be handled by a search shard in Stateless";
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
asyncGet(request, shardId, listener);
} else {
Expand Down Expand Up @@ -148,6 +177,67 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener<GetRes
}
}

private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexShard, ActionListener<GetResponse> listener)
throws IOException {
ShardId shardId = indexShard.shardId();
DiscoveryNode node = getCurrentNodeOfPrimary(shardId);
if (request.refresh()) {
logger.trace("send refresh action for shard {} to node {}", shardId, node.getId());
var refreshRequest = new BasicReplicationRequest(shardId);
refreshRequest.setParentTask(request.getParentTask());
client.executeLocally(
TransportShardRefreshAction.TYPE,
refreshRequest,
ActionListener.wrap(replicationResponse -> super.asyncShardOperation(request, shardId, listener), listener::onFailure)
);
} else if (request.realtime()) {
TransportGetFromTranslogAction.Request getFromTranslogRequest = new TransportGetFromTranslogAction.Request(request, shardId);
getFromTranslogRequest.setParentTask(request.getParentTask());
transportService.sendRequest(
node,
TransportGetFromTranslogAction.NAME,
getFromTranslogRequest,
new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> {
if (r.getResult() != null) {
logger.debug("received result for real-time get for id '{}' from promotable shard", request.id());
l.onResponse(new GetResponse(r.getResult()));
} else {
logger.debug(
"no result for real-time get for id '{}' from promotable shard (segment generation to wait for: {})",
request.id(),
r.segmentGeneration()
);
if (r.segmentGeneration() == -1) {
// Nothing to wait for (no previous unsafe generation), just handle the Get locally.
ActionRunnable.supply(listener, () -> shardOperation(request, shardId)).run();
} else {
assert r.segmentGeneration() > -1L;
indexShard.waitForSegmentGeneration(
r.segmentGeneration(),
ActionListener.wrap(aLong -> super.asyncShardOperation(request, shardId, listener), listener::onFailure)
);
}
}
}), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId))
);
} else {
// A non-real-time get with no explicit refresh requested.
super.asyncShardOperation(request, shardId, listener);
}
}

private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) {
var shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) {
throw new NoShardAvailableActionException(shardId, "primary shard is not active");
}
DiscoveryNode node = clusterService.state().nodes().get(shardRoutingTable.primaryShard().currentNodeId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the same ClusterState instance here rather than asking for it again? That way, the null check below can turn into an assertion (assert node != null).

if (node == null) {
throw new NoShardAvailableActionException(shardId);
}
return node;
}

private IndexShard getIndexShard(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ public class InternalEngine extends Engine {

private final ByteSizeValue totalDiskSpace;

protected static final String REAL_TIME_GET_REFRESH_SOURCE = "realtime_get";
protected static final String UNSAFE_VERSION_MAP_REFRESH_SOURCE = "unsafe_version_map";

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
}
Expand Down Expand Up @@ -848,7 +851,7 @@ protected GetResult realtimeGetUnderLock(
}
}
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
refreshIfNeeded(REAL_TIME_GET_REFRESH_SOURCE, versionValue.seqNo);
}
if (getFromSearcherIfNotInTranslog) {
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
Expand Down Expand Up @@ -960,7 +963,7 @@ private VersionValue getVersionFromMap(BytesRef id) {
// map so once we pass this point we can safely lookup from the version map.
if (versionMap.isUnsafe()) {
lastUnsafeSegmentGenerationForGets.set(lastCommittedSegmentInfos.getGeneration() + 1);
refresh("unsafe_version_map", SearcherScope.INTERNAL, true);
refreshInternalSearcher(UNSAFE_VERSION_MAP_REFRESH_SOURCE, true);
}
versionMap.enforceSafeAccess();
}
Expand Down Expand Up @@ -1929,6 +1932,10 @@ public RefreshResult maybeRefresh(String source) throws EngineException {
return refresh(source, SearcherScope.EXTERNAL, false);
}

protected RefreshResult refreshInternalSearcher(String source, boolean block) throws EngineException {
return refresh(source, SearcherScope.INTERNAL, block);
}

final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
// both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager.
Expand Down Expand Up @@ -3052,7 +3059,7 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
synchronized (refreshIfNeededMutex) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
refresh(source, SearcherScope.INTERNAL, true);
refreshInternalSearcher(source, true);
}
}
}
Expand Down