Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,34 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.ingest.IngestTestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -149,6 +157,100 @@ public void testBulkWithGlobalDefaults() throws Exception {
}
}

public void testBulkRoutingToSingleShard() {
// create two indices, index1 disable bulk routing, index2 enable it
client().admin().indices().prepareCreate("index1").setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)).get();
client().admin().indices().prepareCreate("index2").setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.INDEX_BULK_ROUTING_ENABLED, true)).get();

// bulk write into two indices
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i=0; i<100; i++) {
bulkRequestBuilder.add(client().prepareIndex("index1").setSource("field", "value" + i));
bulkRequestBuilder.add(client().prepareIndex("index2").setSource("field", "value" + i));
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
assertFalse(bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());

// check responses that index1 has 5 shards, index2 only has 1 shard
Map<String, Set<Integer>> shardsToIndex = new HashMap<>(2);
for (BulkItemResponse shardResponse : bulkResponse) {
shardsToIndex.computeIfAbsent(shardResponse.getIndex(), k -> new HashSet<>())
.add(shardResponse.getResponse().getShardId().getId());
}
assertEquals(shardsToIndex.get("index1").size(), 5);
assertEquals(shardsToIndex.get("index2").size(), 1);


// now dynamically enable index1 and disable index2's bulk routing
UpdateSettingsRequest updateSettingsRequest1 = new UpdateSettingsRequest("index1")
.settings(Settings.builder().put(IndexMetadata.INDEX_BULK_ROUTING_ENABLED, true));
client().admin().indices().updateSettings(updateSettingsRequest1).actionGet();
UpdateSettingsRequest updateSettingsRequest2 = new UpdateSettingsRequest("index2")
.settings(Settings.builder().put(IndexMetadata.INDEX_BULK_ROUTING_ENABLED, false));
client().admin().indices().updateSettings(updateSettingsRequest2).actionGet();

// continue bulk writing into two indices
BulkRequestBuilder bulkRequestBuilder1 = client().prepareBulk();
for (int i=0; i<100; i++) {
bulkRequestBuilder1.add(client().prepareIndex("index1").setSource("field", "value" + i));
bulkRequestBuilder1.add(client().prepareIndex("index2").setSource("field", "value" + i));
}
BulkResponse bulkResponse1 = bulkRequestBuilder1.get();
assertFalse(bulkResponse1.buildFailureMessage(), bulkResponse1.hasFailures());

// check responses that index1 has 1 shards, index2 has 5 shards
Map<String, Set<Integer>> shardsToIndex1 = new HashMap<>(2);
for (BulkItemResponse shardResponse : bulkResponse1) {
shardsToIndex1.computeIfAbsent(shardResponse.getIndex(), k -> new HashSet<>())
.add(shardResponse.getResponse().getShardId().getId());
}
assertEquals(shardsToIndex1.get("index1").size(), 1);
assertEquals(shardsToIndex1.get("index2").size(), 5);

// also enable index2's bulk routing
UpdateSettingsRequest updateSettingsRequest3 = new UpdateSettingsRequest("index2")
.settings(Settings.builder().put(IndexMetadata.INDEX_BULK_ROUTING_ENABLED, true));
client().admin().indices().updateSettings(updateSettingsRequest3).actionGet();

// continue bulk writing into two indices, this time with _id or _routing, cannot use the optimization
BulkRequestBuilder bulkRequestBuilder2 = client().prepareBulk();
for (int i=0; i<100; i++) {
bulkRequestBuilder2.add(client().prepareIndex("index1").setId("id" + i).setSource("field", "value" + i));
bulkRequestBuilder2.add(client().prepareIndex("index2").setRouting("routing" + i).setSource("field", "value" + i));
}
BulkResponse bulkResponse2 = bulkRequestBuilder2.get();
assertFalse(bulkResponse2.buildFailureMessage(), bulkResponse2.hasFailures());

// check responses that both index1 and index2 have 5 shards
Map<String, Set<Integer>> shardsToIndex2 = new HashMap<>(2);
for (BulkItemResponse shardResponse : bulkResponse2) {
shardsToIndex2.computeIfAbsent(shardResponse.getIndex(), k -> new HashSet<>())
.add(shardResponse.getResponse().getShardId().getId());
}
assertEquals(shardsToIndex2.get("index1").size(), 5);
assertEquals(shardsToIndex2.get("index2").size(), 5);

// make sure we could search it, we totally indexed three docs per term
client().admin().indices().prepareRefresh("index1").get();
SearchResponse searchResponse = client().prepareSearch("index1")
.setQuery(QueryBuilders.termQuery("field", "value1")).get();
assertThat(searchResponse.getFailedShards(), equalTo(0));
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(3L));
// one of the hit has _routing, length is 10
String routing = null;
for (SearchHit hit : searchResponse.getHits()) {
if (hit.field("_routing") != null) {
routing = hit.field("_routing").getValue();
}
}
assertEquals(routing.length(), 10);
}

private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException {
XContentBuilder pipeline = jsonBuilder()
.startObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -404,6 +405,8 @@ protected void doRun() {
return;
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
// expect most of the case one bulk only contains single index
final Map<Index, String> indexRoutingMap = new HashMap<>(1);
Metadata metadata = clusterState.metadata();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
Expand All @@ -429,6 +432,19 @@ protected void doRun() {
MappingMetadata mappingMd = indexMetadata.mapping();
Version indexCreated = indexMetadata.getCreationVersion();
indexRequest.resolveRouting(metadata);
if (indexMetadata.isBulkRoutingEnabled()
&& indexRequest.routing() == null
&& indexRequest.id() == null) {
// neither _id nor _routing is specified,
// and "index.bulk_routing.enabled" setting is true
// group bulk request to a single shard
String tmpRouting = indexRoutingMap.get(concreteIndex);
if (tmpRouting == null) {
tmpRouting = UUIDs.randomBase64UUID().substring(12);
indexRoutingMap.put(concreteIndex, tmpRouting);
}
indexRequest.routing(tmpRouting);
}
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
case UPDATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.IndexScope));
public static final Setting.AffixSetting<String> INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING =
Setting.prefixKeySetting("index.routing.allocation.initial_recovery.", key -> Setting.simpleString(key));

public static final String INDEX_BULK_ROUTING_ENABLED = "index.bulk_routing.enabled";
public static final Setting<Boolean> INDEX_BULK_ROUTING_ENABLED_SETTING =
Setting.boolSetting(INDEX_BULK_ROUTING_ENABLED, false, Property.Dynamic, Property.IndexScope);
/**
* The number of active shard copies to check for before proceeding with a write operation.
*/
Expand Down Expand Up @@ -390,6 +392,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final ImmutableOpenMap<String, RolloverInfo> rolloverInfos;
private final boolean isSystem;

private final boolean bulkRoutingEnabled;

private IndexMetadata(
final Index index,
final long version,
Expand Down Expand Up @@ -550,6 +554,10 @@ public ActiveShardCount getWaitForActiveShards() {
return waitForActiveShards;
}

public boolean isBulkRoutingEnabled() {
return bulkRoutingEnabled;
}

public Settings getSettings() {
return settings;
}
Expand Down Expand Up @@ -1255,6 +1263,7 @@ public IndexMetadata build() {
}

final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
final boolean bulkRoutingEnabled = settings.getAsBoolean(INDEX_BULK_ROUTING_ENABLED, false);

return new IndexMetadata(
new Index(index, uuid),
Expand All @@ -1281,7 +1290,8 @@ public IndexMetadata build() {
routingPartitionSize,
waitForActiveShards,
rolloverInfos.build(),
isSystem);
isSystem,
bulkRoutingEnabled);
}

public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_DATA_PATH_SETTING,
IndexMetadata.INDEX_HIDDEN_SETTING,
IndexMetadata.INDEX_FORMAT_SETTING,
IndexMetadata.INDEX_BULK_ROUTING_ENABLED_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING,
Expand Down