diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java index ef54c8883b07e..0b914e7c485dc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java @@ -25,18 +25,23 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.ingest.IngestTestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESIntegTestCase; import java.io.IOException; @@ -44,7 +49,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -149,6 +157,100 @@ public void testBulkWithGlobalDefaults() throws Exception { } } + public void testBulkRoutingToSingleShard() { + // create two indices, index1 disable bulk routing, index2 enable it + client().admin().indices().prepareCreate("index1").setSettings(Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)).get(); + client().admin().indices().prepareCreate("index2").setSettings(Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.INDEX_BULK_ROUTING_ENABLED, true)).get(); + + // bulk write into two indices + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + for (int i=0; i<100; i++) { + bulkRequestBuilder.add(client().prepareIndex("index1").setSource("field", "value" + i)); + bulkRequestBuilder.add(client().prepareIndex("index2").setSource("field", "value" + i)); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + assertFalse(bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + + // check responses that index1 has 5 shards, index2 only has 1 shard + Map> shardsToIndex = new HashMap<>(2); + for (BulkItemResponse shardResponse : bulkResponse) { + shardsToIndex.computeIfAbsent(shardResponse.getIndex(), k -> new HashSet<>()) + .add(shardResponse.getResponse().getShardId().getId()); + } + assertEquals(shardsToIndex.get("index1").size(), 5); + assertEquals(shardsToIndex.get("index2").size(), 1); + + + // now dynamically enable index1 and disable index2's bulk routing + UpdateSettingsRequest updateSettingsRequest1 = new UpdateSettingsRequest("index1") + .settings(Settings.builder().put(IndexMetadata.INDEX_BULK_ROUTING_ENABLED, true)); + client().admin().indices().updateSettings(updateSettingsRequest1).actionGet(); + UpdateSettingsRequest updateSettingsRequest2 = new UpdateSettingsRequest("index2") + .settings(Settings.builder().put(IndexMetadata.INDEX_BULK_ROUTING_ENABLED, false)); + client().admin().indices().updateSettings(updateSettingsRequest2).actionGet(); + + // continue bulk writing into two indices + BulkRequestBuilder bulkRequestBuilder1 = client().prepareBulk(); + for (int i=0; i<100; i++) { + bulkRequestBuilder1.add(client().prepareIndex("index1").setSource("field", "value" + i)); + bulkRequestBuilder1.add(client().prepareIndex("index2").setSource("field", "value" + i)); + } + BulkResponse bulkResponse1 = bulkRequestBuilder1.get(); + assertFalse(bulkResponse1.buildFailureMessage(), bulkResponse1.hasFailures()); + + // check responses that index1 has 1 shards, index2 has 5 shards + Map> shardsToIndex1 = new HashMap<>(2); + for (BulkItemResponse shardResponse : bulkResponse1) { + shardsToIndex1.computeIfAbsent(shardResponse.getIndex(), k -> new HashSet<>()) + .add(shardResponse.getResponse().getShardId().getId()); + } + assertEquals(shardsToIndex1.get("index1").size(), 1); + assertEquals(shardsToIndex1.get("index2").size(), 5); + + // also enable index2's bulk routing + UpdateSettingsRequest updateSettingsRequest3 = new UpdateSettingsRequest("index2") + .settings(Settings.builder().put(IndexMetadata.INDEX_BULK_ROUTING_ENABLED, true)); + client().admin().indices().updateSettings(updateSettingsRequest3).actionGet(); + + // continue bulk writing into two indices, this time with _id or _routing, cannot use the optimization + BulkRequestBuilder bulkRequestBuilder2 = client().prepareBulk(); + for (int i=0; i<100; i++) { + bulkRequestBuilder2.add(client().prepareIndex("index1").setId("id" + i).setSource("field", "value" + i)); + bulkRequestBuilder2.add(client().prepareIndex("index2").setRouting("routing" + i).setSource("field", "value" + i)); + } + BulkResponse bulkResponse2 = bulkRequestBuilder2.get(); + assertFalse(bulkResponse2.buildFailureMessage(), bulkResponse2.hasFailures()); + + // check responses that both index1 and index2 have 5 shards + Map> shardsToIndex2 = new HashMap<>(2); + for (BulkItemResponse shardResponse : bulkResponse2) { + shardsToIndex2.computeIfAbsent(shardResponse.getIndex(), k -> new HashSet<>()) + .add(shardResponse.getResponse().getShardId().getId()); + } + assertEquals(shardsToIndex2.get("index1").size(), 5); + assertEquals(shardsToIndex2.get("index2").size(), 5); + + // make sure we could search it, we totally indexed three docs per term + client().admin().indices().prepareRefresh("index1").get(); + SearchResponse searchResponse = client().prepareSearch("index1") + .setQuery(QueryBuilders.termQuery("field", "value1")).get(); + assertThat(searchResponse.getFailedShards(), equalTo(0)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(3L)); + // one of the hit has _routing, length is 10 + String routing = null; + for (SearchHit hit : searchResponse.getHits()) { + if (hit.field("_routing") != null) { + routing = hit.field("_routing").getValue(); + } + } + assertEquals(routing.length(), 10); + } + private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException { XContentBuilder pipeline = jsonBuilder() .startObject() diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 553b03e189526..f24eec660fd8e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -55,6 +55,7 @@ import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.TimeValue; @@ -404,6 +405,8 @@ protected void doRun() { return; } final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); + // expect most of the case one bulk only contains single index + final Map indexRoutingMap = new HashMap<>(1); Metadata metadata = clusterState.metadata(); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); @@ -429,6 +432,19 @@ protected void doRun() { MappingMetadata mappingMd = indexMetadata.mapping(); Version indexCreated = indexMetadata.getCreationVersion(); indexRequest.resolveRouting(metadata); + if (indexMetadata.isBulkRoutingEnabled() + && indexRequest.routing() == null + && indexRequest.id() == null) { + // neither _id nor _routing is specified, + // and "index.bulk_routing.enabled" setting is true + // group bulk request to a single shard + String tmpRouting = indexRoutingMap.get(concreteIndex); + if (tmpRouting == null) { + tmpRouting = UUIDs.randomBase64UUID().substring(12); + indexRoutingMap.put(concreteIndex, tmpRouting); + } + indexRequest.routing(tmpRouting); + } indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); break; case UPDATE: diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 02c683f4f8617..77eae946d2f8a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -302,7 +302,9 @@ public static APIBlock readFrom(StreamInput input) throws IOException { Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.IndexScope)); public static final Setting.AffixSetting INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING = Setting.prefixKeySetting("index.routing.allocation.initial_recovery.", key -> Setting.simpleString(key)); - + public static final String INDEX_BULK_ROUTING_ENABLED = "index.bulk_routing.enabled"; + public static final Setting INDEX_BULK_ROUTING_ENABLED_SETTING = + Setting.boolSetting(INDEX_BULK_ROUTING_ENABLED, false, Property.Dynamic, Property.IndexScope); /** * The number of active shard copies to check for before proceeding with a write operation. */ @@ -390,6 +392,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final ImmutableOpenMap rolloverInfos; private final boolean isSystem; + private final boolean bulkRoutingEnabled; + private IndexMetadata( final Index index, final long version, @@ -550,6 +554,10 @@ public ActiveShardCount getWaitForActiveShards() { return waitForActiveShards; } + public boolean isBulkRoutingEnabled() { + return bulkRoutingEnabled; + } + public Settings getSettings() { return settings; } @@ -1255,6 +1263,7 @@ public IndexMetadata build() { } final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); + final boolean bulkRoutingEnabled = settings.getAsBoolean(INDEX_BULK_ROUTING_ENABLED, false); return new IndexMetadata( new Index(index, uuid), @@ -1281,7 +1290,8 @@ public IndexMetadata build() { routingPartitionSize, waitForActiveShards, rolloverInfos.build(), - isSystem); + isSystem, + bulkRoutingEnabled); } public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 093065961e335..8c6e324e4e08b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -80,6 +80,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_DATA_PATH_SETTING, IndexMetadata.INDEX_HIDDEN_SETTING, IndexMetadata.INDEX_FORMAT_SETTING, + IndexMetadata.INDEX_BULK_ROUTING_ENABLED_SETTING, SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING, SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING, SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING,