diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 7bdeb5e4b199e..0f2298424221b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.ingest.IngestActionForwarder; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.update.UpdateRequest; @@ -44,6 +45,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -189,7 +191,14 @@ public static ActionListe } @Override - protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { + protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener outerListener) { + // As a work-around to support `?refresh`, explicitly replace the refresh policy with a call to the Refresh API. + // TODO: Replace with a less hacky approach. + ActionListener listener = outerListener; + if (DiscoveryNode.isStateless(clusterService.getSettings()) && bulkRequest.getRefreshPolicy() != WriteRequest.RefreshPolicy.NONE) { + listener = outerListener.delegateFailure((l, r) -> { client.admin().indices().prepareRefresh().execute(l.map(ignored -> r)); }); + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + } /* * This is called on the Transport tread so we can check the indexing * memory pressure *quickly* but we don't want to keep the transport diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 06ff4a8fed51c..3deae4081816d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -100,6 +100,7 @@ private void indicesThatCannotBeCreatedTestCase( when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA); when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA); when(clusterService.state()).thenReturn(state); + when(clusterService.getSettings()).thenReturn(Settings.EMPTY); DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class); when(state.getNodes()).thenReturn(discoveryNodes); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index af9bbb3b22798..3343f1f9f6eb0 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -188,6 +188,7 @@ public void setupAction() { DiscoveryNode localNode = mock(DiscoveryNode.class); when(localNode.isIngestNode()).thenAnswer(stub -> localIngest); when(clusterService.localNode()).thenReturn(localNode); + when(clusterService.getSettings()).thenReturn(Settings.EMPTY); remoteNode1 = mock(DiscoveryNode.class); remoteNode2 = mock(DiscoveryNode.class); nodes = mock(DiscoveryNodes.class);