From a9d297d2c41e35087e11c6c7813ca020879bd3fa Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 9 Dec 2016 16:57:01 +0100 Subject: [PATCH 01/17] wire level compatibility --- .../gradle/test/ClusterFormationTasks.groovy | 1 + .../action/DocWriteResponse.java | 11 +++++- .../admin/indices/stats/ShardStats.java | 9 ++++- .../action/index/IndexRequest.java | 3 +- .../replication/ReplicatedWriteRequest.java | 26 +++++++++++++ .../replication/ReplicationRequest.java | 16 -------- .../TransportReplicationAction.java | 38 ++++++++++++++----- .../seqno/GlobalCheckpointSyncAction.java | 18 +++++++-- qa/backwards-5.0/build.gradle | 2 +- qa/rolling-upgrade/build.gradle | 2 +- 10 files changed, 89 insertions(+), 37 deletions(-) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy index 74cae08298bcb..4c6771ccda7c8 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy @@ -268,6 +268,7 @@ class ClusterFormationTasks { static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) { Map esConfig = [ 'cluster.name' : node.clusterName, + 'node.name' : "node-" + node.nodeNum, 'pidfile' : node.pidFile, 'path.repo' : "${node.sharedDir}/repo", 'path.shared_data' : "${node.sharedDir}/", diff --git a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java index 7a12ab8ace255..aef99494d9265 100644 --- a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action; +import org.elasticsearch.Version; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; @@ -214,7 +215,11 @@ public void readFrom(StreamInput in) throws IOException { type = in.readString(); id = in.readString(); version = in.readZLong(); - seqNo = in.readZLong(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readZLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } forcedRefresh = in.readBoolean(); result = Result.readFrom(in); } @@ -226,7 +231,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(type); out.writeString(id); out.writeZLong(version); - out.writeZLong(seqNo); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(seqNo); + } out.writeBoolean(forcedRefresh); result.writeTo(out); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index c503da12317de..d3ed87a27776b 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.stats; +import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -102,7 +103,9 @@ public void readFrom(StreamInput in) throws IOException { statePath = in.readString(); dataPath = in.readString(); isCustomDataPath = in.readBoolean(); - seqNoStats = in.readOptionalWriteable(SeqNoStats::new); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNoStats = in.readOptionalWriteable(SeqNoStats::new); + } } @Override @@ -113,7 +116,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(statePath); out.writeString(dataPath); out.writeBoolean(isCustomDataPath); - out.writeOptionalWriteable(seqNoStats); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeOptionalWriteable(seqNoStats); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 94f40851e890b..b75992fdf8e59 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -507,7 +507,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(routing); out.writeOptionalString(parent); if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { - out.writeOptionalString(null); + // timestamp, at this point #proccess was called which for previous versions meant this was set + out.writeOptionalString(Long.toString(System.currentTimeMillis())); out.writeOptionalWriteable(null); } out.writeBytesReference(source); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java index fa02dac9e1e2d..7abcf8d913b30 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java @@ -19,12 +19,14 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.Version; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -36,6 +38,9 @@ public abstract class ReplicatedWriteRequest> extends ReplicationRequest implements WriteRequest { private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; + long seqNo; + + /** * Constructor for deserialization. */ @@ -62,11 +67,32 @@ public RefreshPolicy getRefreshPolicy() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); refreshPolicy = RefreshPolicy.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readVLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); refreshPolicy.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeVLong(seqNo); + } + } + + /** + * Returns the sequence number for this operation. The sequence number is assigned while the operation + * is performed on the primary shard. + */ + public long seqNo() { + return seqNo; + } + + /** sets the sequence number for this operation. should only be called on the primary shard */ + public void seqNo(long seqNo) { + this.seqNo = seqNo; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index d520b3d4e70ce..9586bab3937b2 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -55,7 +55,6 @@ public abstract class ReplicationRequest(request, replica.allocationId().getId()), transportOptions, - // Eclipse can't handle when this is <> so we specify the type here. - new ActionListenerResponseHandler(listener, ReplicaResponse::new)); + final ConcreteShardRequest concreteShardRequest = + new ConcreteShardRequest<>(request, replica.allocationId().getId()); + sendReplicaRequest(concreteShardRequest, node, listener); } @Override @@ -1060,6 +1070,14 @@ public void onFailure(Exception shardFailedError) { } } + /** sends the give replica request to the supplied nodes */ + protected void sendReplicaRequest(ConcreteShardRequest concreteShardRequest, DiscoveryNode node, + ActionListener listener) { + transportService.sendRequest(node, transportReplicaAction, concreteShardRequest, transportOptions, + // Eclipse can't handle when this is <> so we specify the type here. + new ActionListenerResponseHandler<>(listener, ReplicaResponse::new)); + } + /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ public static final class ConcreteShardRequest extends TransportRequest { diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 8e87729831398..8d6f821ee859d 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -20,20 +20,21 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -41,8 +42,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.io.UncheckedIOException; -import java.io.UnsupportedEncodingException; public class GlobalCheckpointSyncAction extends TransportReplicationAction { @@ -65,6 +64,17 @@ protected ReplicationResponse newResponseInstance() { return new ReplicationResponse(); } + @Override + protected void sendReplicaRequest(ConcreteShardRequest concreteShardRequest, DiscoveryNode node, + ActionListener listener) { + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.sendReplicaRequest(concreteShardRequest, node, listener); + } { + listener.onResponse( + new ReplicaResponse(concreteShardRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } + } + @Override protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request, IndexShard indexShard) throws Exception { long checkpoint = indexShard.getGlobalCheckpoint(); diff --git a/qa/backwards-5.0/build.gradle b/qa/backwards-5.0/build.gradle index 797e1109ae795..9569e4e75f523 100644 --- a/qa/backwards-5.0/build.gradle +++ b/qa/backwards-5.0/build.gradle @@ -18,7 +18,7 @@ integTest { cluster { numNodes = 2 numBwcNodes = 1 - bwcVersion = "6.0.0-alpha1-SNAPSHOT" + bwcVersion = "5.2.0-SNAPSHOT" setting 'logger.org.elasticsearch', 'DEBUG' } } diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index e17e245410820..182e6a9f7d947 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -25,7 +25,7 @@ task oldClusterTest(type: RestIntegTestTask) { mustRunAfter(precommit) cluster { distribution = 'zip' - bwcVersion = '6.0.0-alpha1-SNAPSHOT' // TODO: either randomize, or make this settable with sysprop + bwcVersion = '5.2.0-SNAPSHOT' // TODO: either randomize, or make this settable with sysprop numBwcNodes = 2 numNodes = 2 clusterName = 'rolling-upgrade' From adabd898dd0df16097b85f21b5208dae6c563ccf Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 10 Dec 2016 21:15:45 +0100 Subject: [PATCH 02/17] bwc test --- .gitignore | 1 - .../elasticsearch/backwards/IndexingIT.java | 197 ++++++++++++++++++ .../test/rest/yaml/ObjectPath.java | 17 +- 3 files changed, 211 insertions(+), 4 deletions(-) create mode 100644 qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java diff --git a/.gitignore b/.gitignore index b4ec8795057e2..6bae7245f549a 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,5 @@ html_docs # random old stuff that we should look at the necessity of... /tmp/ -backwards/ eclipse-build diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java new file mode 100644 index 0000000000000..691f2754e4734 --- /dev/null +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -0,0 +1,197 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.backwards; + +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.Version; +import org.elasticsearch.client.Response; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +public class IndexingIT extends ESRestTestCase { + + private ObjectPath objectPath(Response response) throws IOException { + String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + String contentType = response.getHeader("Content-Type"); + XContentType xContentType = XContentType.fromMediaTypeOrFormat(contentType); + return ObjectPath.createFromXContent(xContentType.xContent(), body); + } + + private void assertOK(Response response) { + assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); + } + + private void createIndex(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name, Collections.emptyMap(), + new StringEntity("{ \"settings\": " + Strings.toString(settings, true) + " }"))); + + } + + public void testGlobalCheckpoints() throws Exception { + Nodes nodes = buildNodeAndVersions(); + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), nodes.size() - 1); + final boolean checkGlobalCheckpoints = nodes.getMaster().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED); + logger.info("master version is [{}], global checkpoints will be [{}]", nodes.getMaster().getVersion(), + checkGlobalCheckpoints ? "checked" : "not be checked"); + if (checkGlobalCheckpoints) { + settings.put(IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL.getKey(), "100ms"); + } + createIndex("test", settings.build()); + + final int numDocs = randomInt(10); + for (int i = 0; i < numDocs; i++) { + assertOK(client().performRequest("PUT", "test/test/" + i, emptyMap(), + new StringEntity("{\"test\": \"test_" + i + "\"}"))); + } + assertBusy(() -> { + try { + Response response = client().performRequest("GET", "test/_stats", singletonMap("level", "shards")); + List shardStats = objectPath(response).evaluate("indices.test.shards.0"); + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + final Node node = nodes.getSafe(nodeId); + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + Integer maxSeqNo = ObjectPath.evaluate(shard, "seq_no.max"); + Integer localCheckpoint = ObjectPath.evaluate(shard, "seq_no.local_checkpoint"); + Integer globalCheckpoint = ObjectPath.evaluate(shard, "seq_no.global_checkpoint"); + logger.info("stats for {}, primary [{}]: maxSeqNo [{}], localCheckpoint [{}], globalCheckpoint [{}]", + node, primary, maxSeqNo, localCheckpoint, globalCheckpoint); + assertThat("max_seq no on " + node + " is wrong", maxSeqNo, equalTo(numDocs)); + assertThat("localCheckpoint no on " + node + " is wrong", localCheckpoint, equalTo(numDocs - 1)); + if (checkGlobalCheckpoints) { + assertThat("globalCheckpoint no on " + node + " is wrong", globalCheckpoint, equalTo(numDocs - 1)); + } + } else { + logger.info("skipping seq no test on {}", node); + } + } + } catch (IOException e) { + throw new AssertionError("unexpected io error", e); + } + }); + } + + private Nodes buildNodeAndVersions() throws IOException { + Response response = client().performRequest("GET", "_nodes"); + ObjectPath objectPath = objectPath(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + Nodes nodes = new Nodes(); + for (String id : nodesAsMap.keySet()) { + nodes.add(new Node( + id, + objectPath.evaluate("nodes." + id + ".name"), + Version.fromString(objectPath.evaluate("nodes." + id + ".version")))); + } + response = client().performRequest("GET", "_cluster/state"); + nodes.setMasterNodeId(objectPath(response).evaluate("master_node")); + return nodes; + } + + final class Nodes extends HashMap { + + private String masterNodeId = null; + + public Node getMaster() { + return get(masterNodeId); + } + + public void setMasterNodeId(String id) { + if (get(id) == null) { + throw new IllegalArgumentException("node with id [" + id + "] not found. got:" + toString()); + } + masterNodeId = id; + } + + public void add(Node node) { + put(node.getId(), node); + } + + public Node getSafe(String id) { + Node node = get(id); + if (node == null) { + throw new IllegalArgumentException("node with id [" + id + "] not found"); + } + return node; + } + + @Override + public String toString() { + return "Nodes{" + + "masterNodeId='" + masterNodeId + "\'\n" + + values().stream().map(Node::toString).collect(Collectors.joining("\n")) + + '}'; + } + } + + final class Node { + final private String id; + final private String nodeName; + final private Version version; + + Node(String id, String nodeName, Version version) { + this.id = id; + this.nodeName = nodeName; + this.version = version; + } + + public String getId() { + return id; + } + + public String getNodeName() { + return nodeName; + } + + public Version getVersion() { + return version; + } + + @Override + public String toString() { + return "Node{" + + "id='" + id + '\'' + + ", nodeName='" + nodeName + '\'' + + ", version=" + version + + '}'; + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java index 6311944fdcbb7..265fd7b3e8561 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java @@ -46,17 +46,28 @@ public ObjectPath(Object object) { this.object = object; } + + /** + * A utility method that creates an {@link ObjectPath} via {@link #ObjectPath(Object)} returns + * the result of calling {@link #evaluate(String)} on it. + */ + public static T evaluate(Object object, String path) throws IOException { + return new ObjectPath(object).evaluate(path, Stash.EMPTY); + } + + /** * Returns the object corresponding to the provided path if present, null otherwise */ - public Object evaluate(String path) throws IOException { + public T evaluate(String path) throws IOException { return evaluate(path, Stash.EMPTY); } /** * Returns the object corresponding to the provided path if present, null otherwise */ - public Object evaluate(String path, Stash stash) throws IOException { + @SuppressWarnings("unchecked") + public T evaluate(String path, Stash stash) throws IOException { String[] parts = parsePath(path); Object object = this.object; for (String part : parts) { @@ -65,7 +76,7 @@ public Object evaluate(String path, Stash stash) throws IOException { return null; } } - return object; + return (T)object; } @SuppressWarnings("unchecked") From 4cacc8bcf6587ce69d5b43f1aea0f5b33eeb71ca Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 12 Dec 2016 10:14:41 +0100 Subject: [PATCH 03/17] strengthen test --- .../cluster/metadata/MappingMetaData.java | 2 +- qa/backwards-5.0/build.gradle | 6 +- .../elasticsearch/backwards/IndexingIT.java | 189 +++++++++++++++--- .../test/rest/ESRestTestCase.java | 8 +- 4 files changed, 170 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java index 3ea61385f1c3f..5d80420357b1d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java @@ -204,7 +204,7 @@ public void writeTo(StreamOutput out) throws IOException { // timestamp out.writeBoolean(false); // enabled out.writeString(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format()); - out.writeOptionalString(null); + out.writeOptionalString("now"); // old default out.writeOptionalBoolean(null); } out.writeBoolean(hasParentField()); diff --git a/qa/backwards-5.0/build.gradle b/qa/backwards-5.0/build.gradle index 9569e4e75f523..e1ada1ed8a2bf 100644 --- a/qa/backwards-5.0/build.gradle +++ b/qa/backwards-5.0/build.gradle @@ -11,13 +11,13 @@ apply plugin: 'elasticsearch.rest-test' * now you can run the bwc tests with: * gradle check -Drepos.mavenlocal=true * - * (-Drepos.mavenlocal=true will force gradle to look for the zip distribuiton in the local .m2 repository) + * (-Drepos.mavenlocal=true will force gradle to look for the zip distribution in the local .m2 repository) */ integTest { includePackaged = true cluster { - numNodes = 2 - numBwcNodes = 1 + numNodes = 4 + numBwcNodes = 2 bwcVersion = "5.2.0-SNAPSHOT" setting 'logger.org.elasticsearch', 'DEBUG' } diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 691f2754e4734..a2eeb93d05b3a 100644 --- a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -18,15 +18,19 @@ */ package org.elasticsearch.backwards; +import org.apache.http.HttpHost; import org.apache.http.entity.StringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.Version; import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; @@ -57,59 +61,133 @@ private void assertOK(Response response) { assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); } + private void ensureGreen() throws IOException { + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("wait_for_no_relocating_shards", "true"); + assertOK(client().performRequest("GET", "_cluster/health", params)); + } + private void createIndex(String name, Settings settings) throws IOException { assertOK(client().performRequest("PUT", name, Collections.emptyMap(), new StringEntity("{ \"settings\": " + Strings.toString(settings, true) + " }"))); + } + private void updateIndexSetting(String name, Settings.Builder settings) throws IOException { + updateIndexSetting(name, settings.build()); + } + private void updateIndexSetting(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name + "/_settings", Collections.emptyMap(), + new StringEntity(Strings.toString(settings, true)))); } - public void testGlobalCheckpoints() throws Exception { + protected int indexDocs(String index, final int idStart, final int numDocs) throws IOException { + for (int i = 0; i < numDocs; i++) { + final int id = idStart + i; + assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), + new StringEntity("{\"test\": \"test_" + id + "\"}"))); + } + return numDocs; + } + + public void testSeqNoCheckpoints() throws Exception { Nodes nodes = buildNodeAndVersions(); + logger.info("cluster discovered: {}", nodes.toString()); + final String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(",")); Settings.Builder settings = Settings.builder() .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), nodes.size() - 1); + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) + .put("index.routing.allocation.include._name", bwcNames); + final boolean checkGlobalCheckpoints = nodes.getMaster().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED); logger.info("master version is [{}], global checkpoints will be [{}]", nodes.getMaster().getVersion(), checkGlobalCheckpoints ? "checked" : "not be checked"); if (checkGlobalCheckpoints) { settings.put(IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL.getKey(), "100ms"); } - createIndex("test", settings.build()); + final String index = "test"; + createIndex(index, settings.build()); + try (RestClient newNodeClient = buildClient(restClientSettings(), + nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + int numDocs = indexDocs(index, 0, randomInt(5)); + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); - final int numDocs = randomInt(10); - for (int i = 0; i < numDocs; i++) { - assertOK(client().performRequest("PUT", "test/test/" + i, emptyMap(), - new StringEntity("{\"test\": \"test_" + i + "\"}"))); + logger.info("allowing shards on all nodes"); + updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); + ensureGreen(); + logger.info("indexing some more docs"); + numDocs += indexDocs(index, numDocs, randomInt(5)); + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + logger.info("moving primary to new node"); + Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); + ensureGreen(); + logger.info("indexing some more docs"); + int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5)); + numDocs += numDocsOnNewPrimary; + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); } + } + + protected void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient newNodesClient) throws Exception { assertBusy(() -> { try { - Response response = client().performRequest("GET", "test/_stats", singletonMap("level", "shards")); - List shardStats = objectPath(response).evaluate("indices.test.shards.0"); - for (Object shard : shardStats) { - final String nodeId = ObjectPath.evaluate(shard, "routing.node"); - final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); - final Node node = nodes.getSafe(nodeId); - if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { - Integer maxSeqNo = ObjectPath.evaluate(shard, "seq_no.max"); - Integer localCheckpoint = ObjectPath.evaluate(shard, "seq_no.local_checkpoint"); - Integer globalCheckpoint = ObjectPath.evaluate(shard, "seq_no.global_checkpoint"); - logger.info("stats for {}, primary [{}]: maxSeqNo [{}], localCheckpoint [{}], globalCheckpoint [{}]", - node, primary, maxSeqNo, localCheckpoint, globalCheckpoint); - assertThat("max_seq no on " + node + " is wrong", maxSeqNo, equalTo(numDocs)); - assertThat("localCheckpoint no on " + node + " is wrong", localCheckpoint, equalTo(numDocs - 1)); + List shards = buildShards(nodes, newNodesClient); + Shard primaryShard = shards.stream().filter(Shard::isPrimary).findFirst().get(); + assertNotNull("failed to find primary shard", primaryShard); + final long expectedGlobalCkp; + final long expectMaxSeqNo; + logger.info("primary resolved to node {}", primaryShard.getNode()); + if (primaryShard.getNode().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + expectMaxSeqNo = numDocs - 1; + expectedGlobalCkp = numDocs - 1; + } else { + expectedGlobalCkp = SequenceNumbersService.UNASSIGNED_SEQ_NO; + expectMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + } + for (Shard shard : shards) { + if (shard.getNode().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + final SeqNoStats seqNoStats = shard.getSeqNoStats(); + logger.info("stats for {}, primary [{}]: [{}]", shard.getNode(), shard.isPrimary(), seqNoStats); + assertThat("max_seq no on " + shard.getNode() + " is wrong", seqNoStats.getMaxSeqNo(), equalTo(expectMaxSeqNo)); + assertThat("localCheckpoint no on " + shard.getNode() + " is wrong", + seqNoStats.getLocalCheckpoint(), equalTo(expectMaxSeqNo)); if (checkGlobalCheckpoints) { - assertThat("globalCheckpoint no on " + node + " is wrong", globalCheckpoint, equalTo(numDocs - 1)); + assertThat("globalCheckpoint no on " + shard.getNode() + " is wrong", + seqNoStats.getGlobalCheckpoint(), equalTo(expectedGlobalCkp)); } } else { - logger.info("skipping seq no test on {}", node); + logger.info("skipping seq no test on {}", shard.getNode()); } } } catch (IOException e) { - throw new AssertionError("unexpected io error", e); + throw new AssertionError("unexpected io exception", e); } }); } + private List buildShards(Nodes nodes, RestClient client) throws IOException { + Response response = client.performRequest("GET", "test/_stats", singletonMap("level", "shards")); + List shardStats = objectPath(response).evaluate("indices.test.shards.0"); + ArrayList shards = new ArrayList<>(); + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + final Node node = nodes.getSafe(nodeId); + final SeqNoStats seqNoStats; + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + Integer maxSeqNo = ObjectPath.evaluate(shard, "seq_no.max"); + Integer localCheckpoint = ObjectPath.evaluate(shard, "seq_no.local_checkpoint"); + Integer globalCheckpoint = ObjectPath.evaluate(shard, "seq_no.global_checkpoint"); + seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); + } else { + seqNoStats = null; + } + shards.add(new Shard(node, primary, seqNoStats)); + } + return shards; + } + private Nodes buildNodeAndVersions() throws IOException { Response response = client().performRequest("GET", "_nodes"); ObjectPath objectPath = objectPath(response); @@ -119,7 +197,8 @@ private Nodes buildNodeAndVersions() throws IOException { nodes.add(new Node( id, objectPath.evaluate("nodes." + id + ".name"), - Version.fromString(objectPath.evaluate("nodes." + id + ".version")))); + Version.fromString(objectPath.evaluate("nodes." + id + ".version")), + HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address")))); } response = client().performRequest("GET", "_cluster/state"); nodes.setMasterNodeId(objectPath(response).evaluate("master_node")); @@ -145,6 +224,23 @@ public void add(Node node) { put(node.getId(), node); } + public List getNewNodes() { + Version bwcVersion = getBWCVersion(); + return values().stream().filter(n -> n.getVersion().after(bwcVersion)).collect(Collectors.toList()); + } + + public List getBWCNodes() { + Version bwcVersion = getBWCVersion(); + return values().stream().filter(n -> n.getVersion().equals(bwcVersion)).collect(Collectors.toList()); + } + + public Version getBWCVersion() { + if (isEmpty()) { + throw new IllegalStateException("no nodes available"); + } + return Version.fromId(values().stream().map(node -> node.getVersion().id).min(Integer::compareTo).get()); + } + public Node getSafe(String id) { Node node = get(id); if (node == null) { @@ -156,7 +252,7 @@ public Node getSafe(String id) { @Override public String toString() { return "Nodes{" + - "masterNodeId='" + masterNodeId + "\'\n" + + "masterNodeId='" + masterNodeId + "'\n" + values().stream().map(Node::toString).collect(Collectors.joining("\n")) + '}'; } @@ -166,11 +262,13 @@ final class Node { final private String id; final private String nodeName; final private Version version; + final private HttpHost publishAddress; - Node(String id, String nodeName, Version version) { + Node(String id, String nodeName, Version version, HttpHost publishAddress) { this.id = id; this.nodeName = nodeName; this.version = version; + this.publishAddress = publishAddress; } public String getId() { @@ -181,6 +279,10 @@ public String getNodeName() { return nodeName; } + public HttpHost getPublishAddress() { + return publishAddress; + } + public Version getVersion() { return version; } @@ -194,4 +296,37 @@ public String toString() { '}'; } } + + final class Shard { + final private Node node; + final private boolean Primary; + final private SeqNoStats seqNoStats; + + Shard(Node node, boolean primary, SeqNoStats seqNoStats) { + this.node = node; + Primary = primary; + this.seqNoStats = seqNoStats; + } + + public Node getNode() { + return node; + } + + public boolean isPrimary() { + return Primary; + } + + public SeqNoStats getSeqNoStats() { + return seqNoStats; + } + + @Override + public String toString() { + return "Shard{" + + "node=" + node + + ", Primary=" + Primary + + ", seqNoStats=" + seqNoStats + + '}'; + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 26952fdbe3697..d4ec8bac63670 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -111,8 +111,8 @@ public void initClient() throws IOException { } clusterHosts = unmodifiableList(hosts); logger.info("initializing REST clients against {}", clusterHosts); - client = buildClient(restClientSettings()); - adminClient = buildClient(restAdminSettings()); + client = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + adminClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); } assert client != null; assert adminClient != null; @@ -272,8 +272,8 @@ protected String getProtocol() { return "http"; } - private RestClient buildClient(Settings settings) throws IOException { - RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + RestClientBuilder builder = RestClient.builder(hosts); String keystorePath = settings.get(TRUSTSTORE_PATH); if (keystorePath != null) { final String keystorePass = settings.get(TRUSTSTORE_PASSWORD); From c2cb646e3b158d17f6429c20f504069647909f49 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 12 Dec 2016 10:33:42 +0100 Subject: [PATCH 04/17] only account for shards on new nodes for global checkpoints --- .../indices/cluster/IndicesClusterStateService.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index aaecba91f3e32..9d125c87012a3 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -553,9 +554,15 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard shard.updateRoutingEntry(shardRouting); if (shardRouting.primary()) { IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); - Set activeIds = indexShardRoutingTable.activeShards().stream().map(r -> r.allocationId().getId()) + Set activeIds = indexShardRoutingTable.activeShards().stream() + // filter to shards that track seq# and should be taken into consideration for checkpoint tracking + // shards on old nodes will go through a file based recovery which will also transfer seq# information. + .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) + .map(r -> r.allocationId().getId()) .collect(Collectors.toSet()); - Set initializingIds = indexShardRoutingTable.getAllInitializingShards().stream().map(r -> r.allocationId().getId()) + Set initializingIds = indexShardRoutingTable.getAllInitializingShards().stream() + .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) + .map(r -> r.allocationId().getId()) .collect(Collectors.toSet()); shard.updateAllocationIdsFromMaster(activeIds, initializingIds); } From 02b223caee123305b999206bb5b7eb5f0b685e0a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 12 Dec 2016 11:10:21 +0100 Subject: [PATCH 05/17] fix timestamp for now as it makes assertion fail --- .../main/java/org/elasticsearch/action/index/IndexRequest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index b75992fdf8e59..c12b68edece96 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -508,7 +508,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(parent); if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { // timestamp, at this point #proccess was called which for previous versions meant this was set - out.writeOptionalString(Long.toString(System.currentTimeMillis())); + // nocommit: can we fix this in 5.x? how? + out.writeOptionalString("0"); out.writeOptionalWriteable(null); } out.writeBytesReference(source); From 2db7210e19bf6fda8d791ace1ac63362c3aa4e63 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 12 Dec 2016 12:52:52 +0100 Subject: [PATCH 06/17] linting --- .../org/elasticsearch/backwards/IndexingIT.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java index a2eeb93d05b3a..7bdefb045f266 100644 --- a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -259,10 +259,10 @@ public String toString() { } final class Node { - final private String id; - final private String nodeName; - final private Version version; - final private HttpHost publishAddress; + private final String id; + private final String nodeName; + private final Version version; + private final HttpHost publishAddress; Node(String id, String nodeName, Version version, HttpHost publishAddress) { this.id = id; @@ -298,9 +298,9 @@ public String toString() { } final class Shard { - final private Node node; - final private boolean Primary; - final private SeqNoStats seqNoStats; + private final Node node; + private final boolean Primary; + private final SeqNoStats seqNoStats; Shard(Node node, boolean primary, SeqNoStats seqNoStats) { this.node = node; From 634d92ffd9db1979d03e58278b0256ba437cea31 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 12 Dec 2016 16:37:22 +0100 Subject: [PATCH 07/17] improve assertion message --- .../action/support/replication/ReplicationOperation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 47284789850a7..6d9bff203c477 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -283,7 +283,7 @@ protected List getShards(ShardId shardId, ClusterState state) { } private void decPendingAndFinishIfNeeded() { - assert pendingActions.get() > 0; + assert pendingActions.get() > 0 : "pending action count goes bellow 0 for request [" + request + "]"; if (pendingActions.decrementAndGet() == 0) { finish(); } From 4ea5dd9876947c8d97bfbbe5c100e968d64d1610 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 12 Dec 2016 18:26:15 +0100 Subject: [PATCH 08/17] line length --- .../src/test/java/org/elasticsearch/backwards/IndexingIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java index 7bdefb045f266..87343e830e249 100644 --- a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -129,10 +129,10 @@ public void testSeqNoCheckpoints() throws Exception { } } - protected void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient newNodesClient) throws Exception { + private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception { assertBusy(() -> { try { - List shards = buildShards(nodes, newNodesClient); + List shards = buildShards(nodes, client); Shard primaryShard = shards.stream().filter(Shard::isPrimary).findFirst().get(); assertNotNull("failed to find primary shard", primaryShard); final long expectedGlobalCkp; From 001f7de14542c2a813509939b8073152ebb86791 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 13 Dec 2016 16:45:28 +0100 Subject: [PATCH 09/17] add skip version to cat.shards help test --- .../resources/rest-api-spec/test/cat.shards/10_basic.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml index 8c7cd83b0e039..4a37734d28462 100755 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml @@ -1,5 +1,9 @@ --- "Help": + - skip: + version: " - 5.99.99" + reason: seq no stats were added in 6.0.0 + - do: cat.shards: help: true From cc9a486f0cd1df2c8a6281a52b5a6403afd6ad10 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 14 Dec 2016 09:22:33 +0100 Subject: [PATCH 10/17] force all replication requests to have toString --- .../admin/indices/flush/ShardFlushRequest.java | 2 +- .../replication/BasicReplicationRequest.java | 5 +++++ .../support/replication/ReplicationRequest.java | 8 +------- .../index/seqno/GlobalCheckpointSyncAction.java | 13 +++++++++++++ .../replication/ReplicationOperationTests.java | 12 ++---------- .../TransportReplicationActionTests.java | 5 +++++ .../replication/TransportWriteActionTests.java | 5 +++++ 7 files changed, 32 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 83eaf11ca3a9e..ac32b16eb5711 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -58,6 +58,6 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return "flush {" + super.toString() + "}"; + return "flush {" + shardId + "}"; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java index f431c67b2904b..b4731d19e29e4 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java @@ -37,4 +37,9 @@ public BasicReplicationRequest() { public BasicReplicationRequest(ShardId shardId) { super(shardId); } + + @Override + public String toString() { + return "BasicReplicationRequest{" + shardId + "}"; + } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 9586bab3937b2..091f96c408f67 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -236,13 +236,7 @@ public Request setShardId(ShardId shardId) { } @Override - public String toString() { - if (shardId != null) { - return shardId.toString(); - } else { - return index; - } - } + public abstract String toString(); // force a proper to string to ease debugging @Override public String getDescription() { diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 8d6f821ee859d..7acc3b5575e8a 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -115,6 +115,11 @@ private PrimaryRequest() { public PrimaryRequest(ShardId shardId) { super(shardId); } + + @Override + public String toString() { + return "GlobalCkpSyncPrimary{" + shardId + "}"; + } } public static final class ReplicaRequest extends ReplicationRequest { @@ -144,6 +149,14 @@ public void writeTo(StreamOutput out) throws IOException { public long getCheckpoint() { return checkpoint; } + + @Override + public String toString() { + return "GlobalCkpSyncReplica{" + + "checkpoint=" + checkpoint + + ", shardId=" + shardId + + '}'; + } } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 47325bf5f9818..74a57f3aa91e2 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -33,8 +33,6 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.IndexShardNotStartedException; @@ -42,7 +40,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -366,13 +363,8 @@ public Request() { } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public String toString() { + return "Request{}"; } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 655244e286f68..4a9b70c7b9068 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -984,6 +984,11 @@ public void onRetry() { super.onRetry(); isRetrySet.set(true); } + + @Override + public String toString() { + return "Request{}"; + } } static class Response extends ReplicationResponse { diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 571bbfa72e0f7..cd71418f0e5cd 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -202,6 +202,11 @@ private static class TestRequest extends ReplicatedWriteRequest { public TestRequest() { setShardId(new ShardId("test", "test", 1)); } + + @Override + public String toString() { + return "TestRequest{}"; + } } private static class TestResponse extends ReplicationResponse implements WriteResponse { From f2ca825dfef89a4f89c4b5289bc8e98bee236e13 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 14 Dec 2016 10:29:22 +0100 Subject: [PATCH 11/17] missing else :( --- .../elasticsearch/index/seqno/GlobalCheckpointSyncAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 7acc3b5575e8a..6e13573794d79 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -69,7 +69,7 @@ protected void sendReplicaRequest(ConcreteShardRequest concreteS ActionListener listener) { if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { super.sendReplicaRequest(concreteShardRequest, node, listener); - } { + } else { listener.onResponse( new ReplicaResponse(concreteShardRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO)); } From 059638bee5604d7f1f2a04dc0246a5b17484229b Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 15 Dec 2016 15:34:41 +0100 Subject: [PATCH 12/17] feedback and nocommit removal --- .../action/delete/TransportDeleteAction.java | 4 ++-- .../elasticsearch/action/index/IndexRequest.java | 3 +-- .../action/index/TransportIndexAction.java | 14 +++++++------- .../replication/ReplicatedWriteRequest.java | 7 +++---- .../cluster/metadata/MappingMetaData.java | 2 +- .../ESIndexLevelReplicationTestCase.java | 2 +- 6 files changed, 15 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 81f341090a59d..46eca2c84b9ae 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -39,7 +40,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -157,7 +157,7 @@ public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest re public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) { final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), - request.seqNo(), request.primaryTerm(), request.version(), request.versionType()); + request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType()); return replica.delete(delete); } diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index ebb9c4c29cef6..6a9adc5fc1c49 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -524,8 +524,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(routing); out.writeOptionalString(parent); if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { - // timestamp, at this point #proccess was called which for previous versions meant this was set - // nocommit: can we fix this in 5.x? how? + // timestamp, at this point #proccess was called which for 5.x meant this was set out.writeOptionalString("0"); out.writeOptionalWriteable(null); } diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index fee9e5f443cab..8361552af0ce0 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -48,7 +49,6 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; @@ -170,7 +170,7 @@ protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, Index final long version = indexResult.getVersion(); request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); - request.seqNo(indexResult.getSeqNo()); + request.setSeqNo(indexResult.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); @@ -197,9 +197,9 @@ public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest reque final Engine.Index operation; try { - operation = replica.prepareIndexOnReplica(sourceToParse, request.seqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); + operation = replica.prepareIndexOnReplica(sourceToParse, request.getSeqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); } catch (MapperParsingException e) { - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { @@ -221,7 +221,7 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque try { operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); final ShardId shardId = primary.shardId(); @@ -232,12 +232,12 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update); } catch (IllegalArgumentException e) { // throws IAE on conflicts merging dynamic mappings - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } try { operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java index 7abcf8d913b30..c9c70344d68fc 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java @@ -38,8 +38,7 @@ public abstract class ReplicatedWriteRequest> extends ReplicationRequest implements WriteRequest { private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; - long seqNo; - + private long seqNo; /** * Constructor for deserialization. @@ -87,12 +86,12 @@ public void writeTo(StreamOutput out) throws IOException { * Returns the sequence number for this operation. The sequence number is assigned while the operation * is performed on the primary shard. */ - public long seqNo() { + public long getSeqNo() { return seqNo; } /** sets the sequence number for this operation. should only be called on the primary shard */ - public void seqNo(long seqNo) { + public void setSeqNo(long seqNo) { this.seqNo = seqNo; } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java index 5d80420357b1d..0f9db99326da8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java @@ -204,7 +204,7 @@ public void writeTo(StreamOutput out) throws IOException { // timestamp out.writeBoolean(false); // enabled out.writeString(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format()); - out.writeOptionalString("now"); // old default + out.writeOptionalString("now"); // 5.x default out.writeOptionalBoolean(null); } out.writeBoolean(hasParentField()); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 2425552c24642..02b6eca43a338 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -433,7 +433,7 @@ protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest reques final long version = indexResult.getVersion(); request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); - request.seqNo(indexResult.getSeqNo()); + request.setSeqNo(indexResult.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); } request.primaryTerm(primary.getPrimaryTerm()); From 2a15b1556c495cba428a978998ffb7f34df7859a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 16 Dec 2016 09:19:53 +0100 Subject: [PATCH 13/17] feedback --- .../java/org/elasticsearch/action/index/IndexRequest.java | 4 +++- .../action/support/replication/ReplicatedWriteRequest.java | 6 +++--- .../action/support/replication/ReplicationOperation.java | 2 +- .../support/replication/TransportReplicationAction.java | 5 ++--- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 6a9adc5fc1c49..5809280946c03 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -524,7 +524,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(routing); out.writeOptionalString(parent); if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { - // timestamp, at this point #proccess was called which for 5.x meant this was set + // Serialize a fake timestamp. 5.x expect this value to be set by the #process method so we can't use null. + // On the other hand, indices created on 5.x do not index the timestamp field. Therefore passing a 0 (or any value) for + // the transport layer OK as it will be ignored. out.writeOptionalString("0"); out.writeOptionalWriteable(null); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java index c9c70344d68fc..107c791a069eb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java @@ -38,7 +38,7 @@ public abstract class ReplicatedWriteRequest> extends ReplicationRequest implements WriteRequest { private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; - private long seqNo; + private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; /** * Constructor for deserialization. @@ -67,7 +67,7 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); refreshPolicy = RefreshPolicy.readFrom(in); if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { - seqNo = in.readVLong(); + seqNo = in.readZLong(); } else { seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; } @@ -78,7 +78,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); refreshPolicy.writeTo(out); if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { - out.writeVLong(seqNo); + out.writeZLong(seqNo); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 6d9bff203c477..25dcc29a5c3a3 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -283,7 +283,7 @@ protected List getShards(ShardId shardId, ClusterState state) { } private void decPendingAndFinishIfNeeded() { - assert pendingActions.get() > 0 : "pending action count goes bellow 0 for request [" + request + "]"; + assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]"; if (pendingActions.decrementAndGet() == 0) { finish(); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index aa2e47d58e0d6..91aab28b1a2cc 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -989,8 +989,7 @@ public void readFrom(StreamInput in) throws IOException { localCheckpoint = in.readZLong(); allocationId = in.readString(); } else { - // we use to read empty responses - Empty.INSTANCE.readFrom(in); + // 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing. } } @@ -1070,7 +1069,7 @@ public void onFailure(Exception shardFailedError) { } } - /** sends the give replica request to the supplied nodes */ + /** sends the given replica request to the supplied nodes */ protected void sendReplicaRequest(ConcreteShardRequest concreteShardRequest, DiscoveryNode node, ActionListener listener) { transportService.sendRequest(node, transportReplicaAction, concreteShardRequest, transportOptions, From 647d5eed16cfb017dfd6d7a1b346a28431018b18 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 16 Dec 2016 17:07:03 +0100 Subject: [PATCH 14/17] update gitignore to explicitly point to backwards release folder (instead of a general glob) --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 6bae7245f549a..5d7dbbefdc8b8 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,10 @@ dependency-reduced-pom.xml # osx stuff .DS_Store +# default folders in which the create_bwc_index.py expects to find old es versions in +/backwards +/dev-tools/backwards + # needed in case docs build is run...maybe we can configure doc build to generate files under build? html_docs From f73d165eee95aa488977349a7e8f75db7a40ad75 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 17 Dec 2016 16:12:06 +0100 Subject: [PATCH 15/17] fix compilation --- .../action/bulk/TransportShardBulkAction.java | 13 ++++++------- .../action/delete/TransportDeleteAction.java | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index cef89e1ce7855..86024e4dcd592 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; -import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -151,7 +150,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh final long version = indexResult.getVersion(); indexRequest.version(version); indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); - indexRequest.seqNo(indexResult.getSeqNo()); + indexRequest.setSeqNo(indexResult.getSeqNo()); assert indexRequest.versionType().validateVersionForWrites(indexRequest.version()); response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); @@ -175,7 +174,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh // update the request with the version so it will go to the replicas deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery()); deleteRequest.version(deleteResult.getVersion()); - deleteRequest.seqNo(deleteResult.getSeqNo()); + deleteRequest.setSeqNo(deleteResult.getSeqNo()); assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version()); response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound()); @@ -286,7 +285,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind final long version = updateOperationResult.getVersion(); indexRequest.version(version); indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); - indexRequest.seqNo(updateOperationResult.getSeqNo()); + indexRequest.setSeqNo(updateOperationResult.getSeqNo()); assert indexRequest.versionType().validateVersionForWrites(indexRequest.version()); } break; @@ -297,7 +296,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind // update the request with the version so it will go to the replicas deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery()); deleteRequest.version(updateOperationResult.getVersion()); - deleteRequest.seqNo(updateOperationResult.getSeqNo()); + deleteRequest.setSeqNo(updateOperationResult.getSeqNo()); assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version()); } break; @@ -349,9 +348,9 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind break; } assert (replicaRequest.request() instanceof IndexRequest - && ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) || + && ((IndexRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) || (replicaRequest.request() instanceof DeleteRequest - && ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO); + && ((DeleteRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO); // successful operation break; // out of retry loop } else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) { diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 91624bc36b64e..81f916191a2b4 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -129,7 +129,7 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde // update the request with the version so it will go to the replicas request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); request.version(result.getVersion()); - request.seqNo(result.getSeqNo()); + request.setSeqNo(result.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); response = new DeleteResponse( primary.shardId(), From 2cc261029a7b0236c102b1e0a0a9c282a8dc7d38 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 17 Dec 2016 22:28:31 +0100 Subject: [PATCH 16/17] don't replicate on failures (like version conflicts) --- .../elasticsearch/action/delete/TransportDeleteAction.java | 5 ++++- .../org/elasticsearch/action/index/TransportIndexAction.java | 5 ++++- .../support/replication/TransportReplicationAction.java | 4 +++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 81f916191a2b4..5601d54ea4740 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -125,12 +125,14 @@ protected DeleteResponse newResponseInstance() { protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception { final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary); final DeleteResponse response; + final DeleteRequest replicaRequest; if (result.hasFailure() == false) { // update the request with the version so it will go to the replicas request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); request.version(result.getVersion()); request.setSeqNo(result.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); + replicaRequest = request; response = new DeleteResponse( primary.shardId(), request.type(), @@ -140,8 +142,9 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde result.isFound()); } else { response = null; + replicaRequest = null; } - return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary); + return new WritePrimaryResult(replicaRequest, response, result.getTranslogLocation(), result.getFailure(), primary); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 6c61055d82d23..9ed9f7f7cd11d 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -165,6 +165,7 @@ protected IndexResponse newResponseInstance() { protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception { final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction); final IndexResponse response; + final IndexRequest replicaRequest; if (indexResult.hasFailure() == false) { // update the version on request so it will happen on the replicas final long version = indexResult.getVersion(); @@ -172,12 +173,14 @@ protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, Index request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); request.setSeqNo(indexResult.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); + replicaRequest = request; response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); } else { response = null; + replicaRequest = null; } - return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary); + return new WritePrimaryResult(replicaRequest, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 52699f9f43bd4..5b3611ea7b6ea 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -665,7 +665,7 @@ protected void doRun() { private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node) { setPhase(task, "waiting_on_primary"); if (logger.isTraceEnabled()) { - logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ", + logger.trace("send action [{}] to local primary [{}] for request [{}] with cluster state version [{}] to [{}] ", transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId()); } performAction(node, transportPrimaryAction, true, new ConcreteShardRequest<>(request, primary.allocationId().getId())); @@ -951,6 +951,8 @@ public void failShard(String reason, Exception e) { public PrimaryResult perform(Request request) throws Exception { PrimaryResult result = shardOperationOnPrimary(request, indexShard); if (result.replicaRequest() != null) { + assert result.finalFailure == null : "a replica request [" + result.replicaRequest() + + "] with a primary failure [" + result.finalFailure + "]"; result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm()); } return result; From 7577b1202c11e6022946c64ef12d8dd354d94287 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 19 Dec 2016 10:47:04 +0100 Subject: [PATCH 17/17] put back diamond operator for ecplise --- .../action/support/replication/TransportReplicationAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 5b3611ea7b6ea..fe8ade181780b 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1076,7 +1076,7 @@ protected void sendReplicaRequest(ConcreteShardRequest concreteS ActionListener listener) { transportService.sendRequest(node, transportReplicaAction, concreteShardRequest, transportOptions, // Eclipse can't handle when this is <> so we specify the type here. - new ActionListenerResponseHandler<>(listener, ReplicaResponse::new)); + new ActionListenerResponseHandler(listener, ReplicaResponse::new)); } /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/