Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -556,11 +556,7 @@ public void testUpdateSettings() {
}

public void testSearchQueryThenFetch() throws Exception {
interceptTransportActions(
SearchTransportService.QUERY_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME,
SearchTransportService.FREE_CONTEXT_ACTION_NAME
);
interceptTransportActions(SearchTransportService.QUERY_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME);

String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
Expand All @@ -580,16 +576,13 @@ public void testSearchQueryThenFetch() throws Exception {
SearchTransportService.QUERY_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME
);
// free context messages are not necessarily sent, but if they are, check their indices
assertIndicesSubsetOptionalRequests(Arrays.asList(searchRequest.indices()), SearchTransportService.FREE_CONTEXT_ACTION_NAME);
}

public void testSearchDfsQueryThenFetch() throws Exception {
interceptTransportActions(
SearchTransportService.DFS_ACTION_NAME,
SearchTransportService.QUERY_ID_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME,
SearchTransportService.FREE_CONTEXT_ACTION_NAME
SearchTransportService.FETCH_ID_ACTION_NAME
);

String[] randomIndicesOrAliases = randomIndicesOrAliases();
Expand All @@ -611,8 +604,6 @@ public void testSearchDfsQueryThenFetch() throws Exception {
SearchTransportService.QUERY_ID_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME
);
// free context messages are not necessarily sent, but if they are, check their indices
assertIndicesSubsetOptionalRequests(Arrays.asList(searchRequest.indices()), SearchTransportService.FREE_CONTEXT_ACTION_NAME);
}

private static void assertSameIndices(IndicesRequest originalRequest, String... actions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, getOriginalIndices(entry.getShardIndex()));
sendReleaseSearchContext(entry.getContextId(), connection);
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
Expand All @@ -752,10 +752,10 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
* @see org.elasticsearch.search.fetch.FetchSearchResult#getContextId()
*
*/
void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) {
void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) {
assert isPartOfPointInTime(contextId) == false : "Must not release point in time context [" + contextId + "]";
if (connection != null) {
searchTransportService.sendFreeContext(connection, contextId, originalIndices);
searchTransportService.sendFreeContext(connection, contextId, ActionListener.noop());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ public void onFailure(Exception exception) {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(
querySearchRequest.contextId(),
connection,
context.getOriginalIndices(shardIndex)
);
context.sendReleaseSearchContext(querySearchRequest.contextId(), connection);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ protected static void releaseIrrelevantSearchContext(SearchPhaseResult searchPha
context.getLogger().trace("trying to release search context [{}]", phaseResult.getContextId());
SearchShardTarget shardTarget = phaseResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
context.sendReleaseSearchContext(
phaseResult.getContextId(),
connection,
context.getOriginalIndices(phaseResult.getShardIndex())
);
context.sendReleaseSearchContext(phaseResult.getContextId(), connection);
} catch (Exception e) {
context.getLogger().trace("failed to release context", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -124,24 +122,6 @@ public SearchTransportService(
this.responseWrapper = responseWrapper;
}

private static final ActionListenerResponseHandler<SearchFreeContextResponse> SEND_FREE_CONTEXT_LISTENER =
new ActionListenerResponseHandler<>(
ActionListener.noop(),
SearchFreeContextResponse::readFrom,
TransportResponseHandler.TRANSPORT_WORKER
);

public void sendFreeContext(Transport.Connection connection, final ShardSearchContextId contextId, OriginalIndices originalIndices) {
transportService.sendRequest(
connection,
FREE_CONTEXT_ACTION_NAME,
new SearchFreeContextRequest(originalIndices, contextId),
TransportRequestOptions.EMPTY,
// no need to respond if it was freed or not
SEND_FREE_CONTEXT_LISTENER
);
}

public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
Expand Down Expand Up @@ -370,43 +350,6 @@ private static class ClearScrollContextsRequest extends TransportRequest {
}
}

static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
private final OriginalIndices originalIndices;

SearchFreeContextRequest(OriginalIndices originalIndices, ShardSearchContextId id) {
super(id);
this.originalIndices = originalIndices;
}

SearchFreeContextRequest(StreamInput in) throws IOException {
super(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}

@Override
public String[] indices() {
if (originalIndices == null) {
return null;
}
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
if (originalIndices == null) {
return null;
}
return originalIndices.indicesOptions();
}

}

public static class SearchFreeContextResponse extends TransportResponse {

private static final SearchFreeContextResponse FREED = new SearchFreeContextResponse(true);
Expand Down Expand Up @@ -456,12 +399,13 @@ public static void registerRequestHandler(TransportService transportService, Sea
SearchFreeContextResponse::readFrom
);

transportService.registerRequestHandler(
FREE_CONTEXT_ACTION_NAME,
freeContextExecutor,
SearchFreeContextRequest::new,
freeContextHandler
);
// TODO: remove this handler once the lowest compatible version stops using it
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, freeContextExecutor, in -> {
var res = new ScrollFreeContextRequest(in);
// this handler exists for BwC purposes only, we don't need the original indices to free the context
OriginalIndices.readOriginalIndices(in);
return res;
}, freeContextHandler);
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom);

transportService.registerRequestHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ long buildTookInMillis() {
}

@Override
public void sendReleaseSearchContext(
ShardSearchContextId contextId,
Transport.Connection connection,
OriginalIndices originalIndices
) {
public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) {
releasedContexts.add(contextId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected void executePhaseOnShard(
}

@Override
public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) {
public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) {
releasedSearchContexts.add(contextId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,11 @@ public void testFanOutAndCollect() throws InterruptedException {
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(null, null, null) {
@Override
public void sendFreeContext(Transport.Connection connection, ShardSearchContextId contextId, OriginalIndices originalIndices) {
public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
numFreedContext.incrementAndGet();
assertTrue(nodeToContextMap.containsKey(connection.getNode()));
assertTrue(nodeToContextMap.get(connection.getNode()).remove(contextId));
Expand Down Expand Up @@ -363,7 +367,7 @@ public void run() {
for (int i = 0; i < results.getNumShards(); i++) {
TestSearchPhaseResult result = results.getAtomicArray().get(i);
assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId());
sendReleaseSearchContext(result.getContextId(), new MockConnection(result.node), OriginalIndices.NONE);
sendReleaseSearchContext(result.getContextId(), new MockConnection(result.node));
}
responseListener.onResponse(testResponse);
if (latchTriggered.compareAndSet(false, true) == false) {
Expand Down Expand Up @@ -421,8 +425,13 @@ public void testFanOutAndFail() throws InterruptedException {
);
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(null, null, null) {

@Override
public void sendFreeContext(Transport.Connection connection, ShardSearchContextId contextId, OriginalIndices originalIndices) {
public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
assertNotNull(contextId);
numFreedContext.incrementAndGet();
assertTrue(nodeToContextMap.containsKey(connection.getNode()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.action.search.SearchTransportService.FREE_CONTEXT_ACTION_NAME;
import static org.elasticsearch.action.search.SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
import static org.elasticsearch.test.NodeRoles.dataNode;
Expand Down Expand Up @@ -477,7 +477,7 @@ protected void ensureNoInitializingShards() {
*/
protected void ensureAllFreeContextActionsAreConsumed() throws Exception {
logger.info("--> waiting for all free_context tasks to complete within a reasonable time");
safeGet(clusterAdmin().prepareListTasks().setActions(FREE_CONTEXT_ACTION_NAME + "*").setWaitForCompletion(true).execute());
safeGet(clusterAdmin().prepareListTasks().setActions(FREE_CONTEXT_SCROLL_ACTION_NAME + "*").setWaitForCompletion(true).execute());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ public class RBACEngine implements AuthorizationEngine {
private static final String DELETE_SUB_REQUEST_REPLICA = TransportDeleteAction.NAME + "[r]";

private static final Logger logger = LogManager.getLogger(RBACEngine.class);

private static final Set<String> SCROLL_RELATED_ACTIONS = Set.of(
TransportSearchScrollAction.TYPE.name(),
SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME,
SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME,
SearchTransportService.QUERY_SCROLL_ACTION_NAME,
SearchTransportService.FREE_CONTEXT_ACTION_NAME,
SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME,
TransportClearScrollAction.NAME,
"indices:data/read/sql/close_cursor",
SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME
);

private final Settings settings;
private final CompositeRolesStore rolesStore;
private final FieldPermissionsCache fieldPermissionsCache;
Expand Down Expand Up @@ -319,7 +332,7 @@ public void authorizeIndexAction(
// need to validate that the action is allowed and then move on
listener.onResponse(role.checkIndicesAction(action) ? IndexAuthorizationResult.EMPTY : IndexAuthorizationResult.DENIED);
} else if (request instanceof IndicesRequest == false) {
if (isScrollRelatedAction(action)) {
if (SCROLL_RELATED_ACTIONS.contains(action)) {
// scroll is special
// some APIs are indices requests that are not actually associated with indices. For example,
// search scroll request, is categorized under the indices context, but doesn't hold indices names
Expand Down Expand Up @@ -999,17 +1012,6 @@ public int hashCode() {
}
}

private static boolean isScrollRelatedAction(String action) {
return action.equals(TransportSearchScrollAction.TYPE.name())
|| action.equals(SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME)
|| action.equals(SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME)
|| action.equals(SearchTransportService.QUERY_SCROLL_ACTION_NAME)
|| action.equals(SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME)
|| action.equals(TransportClearScrollAction.NAME)
|| action.equals("indices:data/read/sql/close_cursor")
|| action.equals(SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME);
}

private static boolean isAsyncRelatedAction(String action) {
return action.equals(SubmitAsyncSearchAction.NAME)
|| action.equals(GetAsyncSearchAction.NAME)
Expand Down