Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
Expand All @@ -44,6 +45,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -189,7 +191,14 @@ public static <Response extends ReplicationResponse & WriteResponse> ActionListe
}

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> outerListener) {
// As a work-around to support `?refresh`, explicitly replace the refresh policy with a call to the Refresh API.
// TODO: Replace with a less hacky approach.
ActionListener<BulkResponse> listener = outerListener;
if (DiscoveryNode.isStateless(clusterService.getSettings()) && bulkRequest.getRefreshPolicy() != WriteRequest.RefreshPolicy.NONE) {
listener = outerListener.delegateFailure((l, r) -> { client.admin().indices().prepareRefresh().execute(l.map(ignored -> r)); });
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
Copy link
Member Author

@pxsalehi pxsalehi Jan 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for the WAIT_UNTIL to work properly, we'd need to also integrate the refresh listeners mechanism in Stateless, since currently they don't get called back. Since we're going with work-arounds here to keep it short, I've just replaced the policy once we know we're calling a refresh afterwards anyway.

}
/*
* This is called on the Transport tread so we can check the indexing
* memory pressure *quickly* but we don't want to keep the transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ private void indicesThatCannotBeCreatedTestCase(
when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA);
when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA);
when(clusterService.state()).thenReturn(state);
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);

DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(state.getNodes()).thenReturn(discoveryNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public void setupAction() {
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.isIngestNode()).thenAnswer(stub -> localIngest);
when(clusterService.localNode()).thenReturn(localNode);
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
remoteNode1 = mock(DiscoveryNode.class);
remoteNode2 = mock(DiscoveryNode.class);
nodes = mock(DiscoveryNodes.class);
Expand Down