diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java index 34fb826d62382..9718607d8b80e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java @@ -70,7 +70,6 @@ static Request putWatch(PutWatchRequest putWatchRequest) { Request request = new Request(HttpPut.METHOD_NAME, endpoint); RequestConverters.Params params = new RequestConverters.Params(request) - .withVersion(putWatchRequest.getVersion()) .withIfSeqNo(putWatchRequest.ifSeqNo()) .withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm()); if (putWatchRequest.isActive() == false) { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java index 8b83970723dd2..1d13a77e06cf9 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java @@ -21,7 +21,6 @@ import org.elasticsearch.client.Validatable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -43,11 +42,9 @@ public final class PutWatchRequest implements Validatable { private final BytesReference source; private final XContentType xContentType; private boolean active = true; - private long version = Versions.MATCH_ANY; private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - public PutWatchRequest(String id, BytesReference source, XContentType xContentType) { Objects.requireNonNull(id, "watch id is missing"); if (isValidId(id) == false) { @@ -95,14 +92,6 @@ public XContentType xContentType() { return xContentType; } - public long getVersion() { - return version; - } - - public void setVersion(long version) { - this.version = version; - } - /** * only performs this put request if the watch's last modification was assigned the given * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 9364e2ce2d57c..e3807b6067960 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -768,8 +768,6 @@ public void testUpdate() throws IOException { } } setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams); - setRandomVersion(updateRequest, expectedParams); - setRandomVersionType(updateRequest::versionType, expectedParams); setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body if (randomBoolean()) { int retryOnConflict = randomIntBetween(0, 5); @@ -911,14 +909,7 @@ public void testBulk() throws IOException { if (randomBoolean()) { docWriteRequest.routing(randomAlphaOfLength(10)); } - if (randomBoolean()) { - if (randomBoolean()) { - docWriteRequest.version(randomNonNegativeLong()); - } - if (randomBoolean()) { - docWriteRequest.versionType(randomFrom(VersionType.values())); - } - } else if (randomBoolean()) { + if (opType != DocWriteRequest.OpType.UPDATE && randomBoolean()) { docWriteRequest.setIfSeqNo(randomNonNegativeLong()); docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200)); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java index a31206bee88cc..19483cc201a5f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java @@ -29,8 +29,8 @@ import org.elasticsearch.client.watcher.DeactivateWatchRequest; import org.elasticsearch.client.watcher.DeleteWatchRequest; import org.elasticsearch.client.watcher.ExecuteWatchRequest; -import org.elasticsearch.client.watcher.PutWatchRequest; import org.elasticsearch.client.watcher.GetWatchRequest; +import org.elasticsearch.client.watcher.PutWatchRequest; import org.elasticsearch.client.watcher.StartWatchServiceRequest; import org.elasticsearch.client.watcher.StopWatchServiceRequest; import org.elasticsearch.client.watcher.WatcherStatsRequest; @@ -88,9 +88,12 @@ public void testPutWatch() throws Exception { } if (randomBoolean()) { - long version = randomLongBetween(10, 100); - putWatchRequest.setVersion(version); - expectedParams.put("version", String.valueOf(version)); + long seqNo = randomNonNegativeLong(); + long ifPrimaryTerm = randomLongBetween(1, 200); + putWatchRequest.setIfSeqNo(seqNo); + putWatchRequest.setIfPrimaryTerm(ifPrimaryTerm); + expectedParams.put("if_seq_no", String.valueOf(seqNo)); + expectedParams.put("if_primary_term", String.valueOf(ifPrimaryTerm)); } Request request = WatcherRequestConverters.putWatch(putWatchRequest); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index 1cd76e48ffff5..32e4ac223b357 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -170,7 +170,6 @@ public void testIndex() throws Exception { // tag::index-response String index = indexResponse.getIndex(); String id = indexResponse.getId(); - long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { // <1> } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { @@ -220,7 +219,8 @@ public void testIndex() throws Exception { IndexRequest request = new IndexRequest("posts") .id("1") .source("field", "value") - .version(1); + .setIfSeqNo(10L) + .setIfPrimaryTerm(20); try { IndexResponse response = client.index(request, RequestOptions.DEFAULT); } catch(ElasticsearchException e) { @@ -432,7 +432,8 @@ public void testUpdate() throws Exception { // tag::update-conflict UpdateRequest request = new UpdateRequest("posts", "1") .doc("field", "value") - .version(1); + .setIfSeqNo(101L) + .setIfPrimaryTerm(200L); try { UpdateResponse updateResponse = client.update( request, RequestOptions.DEFAULT); @@ -499,9 +500,10 @@ public void testUpdate() throws Exception { request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // <1> request.setRefreshPolicy("wait_for"); // <2> // end::update-request-refresh - // tag::update-request-version - request.version(2); // <1> - // end::update-request-version + // tag::update-request-cas + request.setIfSeqNo(2L); // <1> + request.setIfPrimaryTerm(1L); // <2> + // end::update-request-request-cas // tag::update-request-detect-noop request.detectNoop(false); // <1> // end::update-request-detect-noop @@ -630,7 +632,7 @@ public void testDelete() throws Exception { // tag::delete-conflict try { DeleteResponse deleteResponse = client.delete( - new DeleteRequest("posts", "1").version(2), + new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2), RequestOptions.DEFAULT); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { diff --git a/docs/java-rest/high-level/document/update.asciidoc b/docs/java-rest/high-level/document/update.asciidoc index 3112d85512217..35300512dfc3e 100644 --- a/docs/java-rest/high-level/document/update.asciidoc +++ b/docs/java-rest/high-level/document/update.asciidoc @@ -140,9 +140,10 @@ include-tagged::{doc-tests-file}[{api}-request-source-exclude] ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- -include-tagged::{doc-tests-file}[{api}-request-version] +include-tagged::{doc-tests-file}[{api}-request-cas] -------------------------------------------------- -<1> Version +<1> ifSeqNo +<2> ifPrimaryTerm ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- diff --git a/docs/reference/migration/migrate_7_0/api.asciidoc b/docs/reference/migration/migrate_7_0/api.asciidoc index bb151edb778e2..6c1d03760f904 100644 --- a/docs/reference/migration/migrate_7_0/api.asciidoc +++ b/docs/reference/migration/migrate_7_0/api.asciidoc @@ -2,6 +2,22 @@ [[breaking_70_api_changes]] === API changes +[float] +==== Internal Versioning is no longer supported for optimistic concurrency control + +Elasticsearch maintains a numeric version field for each document it stores. That field +is incremented by one with every change to the document. Until 7.0.0 the API allowed using +that field for optimistic concurrency control, i.e., making a write operation conditional +on the current document version. Sadly, that approach is flawed because the value of the +version doesn't always uniquely represent a change to the document. If a primary fails +while handling a write operation, it may expose a version that will then be reused by the +new primary. + +Due to that issue, internal versioning can no longer be used and is replaced by a new +method based on sequence numbers. See <> for more details. + +Note that the `external` versioning type is still fully supported. + [float] ==== Camel case and underscore parameters deprecated in 6.x have been removed A number of duplicate parameters deprecated in 6.x have been removed from diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java index b317ea06d9f35..f7d8d037fcddb 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java @@ -20,11 +20,9 @@ package org.elasticsearch.index.reindex; import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.client.ParentTaskAssigningClient; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -33,18 +31,10 @@ */ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction { - private final boolean useSeqNoForCAS; - public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request, - ScriptService scriptService, ClusterState clusterState, ActionListener listener) { - super(task, - // not all nodes support sequence number powered optimistic concurrency control, we fall back to version - clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false, - // all nodes support sequence number powered optimistic concurrency control and we can use it - clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0), - logger, client, threadPool, action, request, listener); - useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0); + ScriptService scriptService, ActionListener listener) { + super(task, false, true, logger, client, threadPool, action, request, listener); } @Override @@ -60,12 +50,8 @@ protected RequestWrapper buildRequest(ScrollableHitSource.Hit doc delete.index(doc.getIndex()); delete.type(doc.getType()); delete.id(doc.getId()); - if (useSeqNoForCAS) { - delete.setIfSeqNo(doc.getSeqNo()); - delete.setIfPrimaryTerm(doc.getPrimaryTerm()); - } else { - delete.version(doc.getVersion()); - } + delete.setIfSeqNo(doc.getSeqNo()); + delete.setIfPrimaryTerm(doc.getPrimaryTerm()); return wrap(delete); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java index 08538b335535d..d7959f0058974 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java @@ -61,7 +61,7 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener buildRequest(ScrollableHitSource.Hit doc) index.type(doc.getType()); index.id(doc.getId()); index.source(doc.getSource(), doc.getXContentType()); - if (useSeqNoForCAS) { - index.setIfSeqNo(doc.getSeqNo()); - index.setIfPrimaryTerm(doc.getPrimaryTerm()); - } else { - index.versionType(VersionType.INTERNAL); - index.version(doc.getVersion()); - } + index.setIfSeqNo(doc.getSeqNo()); + index.setIfPrimaryTerm(doc.getPrimaryTerm()); index.setPipeline(mainRequest.getPipeline()); return wrap(index); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWhileModifyingTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWhileModifyingTests.java index 987830ddd3bc9..1c3456fe20c5f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWhileModifyingTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWhileModifyingTests.java @@ -67,7 +67,7 @@ public void testUpdateWhileReindexing() throws Exception { IndexRequestBuilder index = client().prepareIndex("test", "test", "test").setSource("test", value.get()) .setRefreshPolicy(IMMEDIATE); /* - * Update by query increments the version number so concurrent + * Update by query changes the document so concurrent * indexes might get version conflict exceptions so we just * blindly retry. */ @@ -75,7 +75,7 @@ public void testUpdateWhileReindexing() throws Exception { while (true) { attempts++; try { - index.setVersion(get.getVersion()).get(); + index.setIfSeqNo(get.getSeqNo()).setIfPrimaryTerm(get.getPrimaryTerm()).get(); break; } catch (VersionConflictEngineException e) { if (attempts >= MAX_ATTEMPTS) { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json index 92f1013a317c3..106b29b252ad3 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json @@ -70,15 +70,6 @@ "if_primary_term" : { "type" : "number", "description" : "only perform the update operation if the last operation that has changed the document has the specified primary term" - }, - "version": { - "type": "number", - "description": "Explicit version number for concurrency control" - }, - "version_type": { - "type": "enum", - "options": ["internal", "force"], - "description": "Specific version type" } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/create/30_internal_version.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/create/30_internal_version.yml deleted file mode 100644 index 52e8e464da094..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/create/30_internal_version.yml +++ /dev/null @@ -1,36 +0,0 @@ ---- -"Internal version": - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - do: - create: - index: test_1 - id: 1 - body: { foo: bar } - - - match: { _version: 1} - - - do: - catch: conflict - create: - index: test_1 - id: 1 - body: { foo: bar } - ---- -"Internal versioning with explicit version": - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - do: - catch: bad_request - create: - index: test - id: 3 - body: { foo: bar } - version: 5 - - - match: { status: 400 } - - match: { error.type: action_request_validation_exception } - - match: { error.reason: "Validation Failed: 1: create operations do not support explicit versions. use index instead;" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/create/31_internal_version_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/create/31_internal_version_with_types.yml deleted file mode 100644 index 83772828bc8f4..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/create/31_internal_version_with_types.yml +++ /dev/null @@ -1,35 +0,0 @@ ---- -"Internal version": - - - do: - create: - index: test_1 - type: test - id: 1 - body: { foo: bar } - - - match: { _version: 1} - - - do: - catch: conflict - create: - index: test_1 - type: test - id: 1 - body: { foo: bar } - ---- -"Internal versioning with explicit version": - - - do: - catch: bad_request - create: - index: test - type: test - id: 3 - body: { foo: bar } - version: 5 - - - match: { status: 400 } - - match: { error.type: action_request_validation_exception } - - match: { error.reason: "Validation Failed: 1: create operations do not support explicit versions. use index instead;" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_internal_version.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_cas.yml similarity index 71% rename from rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_internal_version.yml rename to rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_cas.yml index afe69b4fe82e5..f3c7b0acbcccd 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_internal_version.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_cas.yml @@ -11,19 +11,21 @@ id: 1 body: { foo: bar } - - match: { _version: 1} + - match: { _seq_no: 0 } - do: catch: conflict delete: index: test_1 id: 1 - version: 2 + if_seq_no: 2 + if_primary_term: 1 - do: delete: index: test_1 id: 1 - version: 1 + if_seq_no: 0 + if_primary_term: 1 - - match: { _version: 2 } + - match: { _seq_no: 1 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_cas_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_cas_with_types.yml new file mode 100644 index 0000000000000..ef352a9bad6b1 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_cas_with_types.yml @@ -0,0 +1,30 @@ +--- +"Internal version": + + - do: + index: + index: test_1 + type: test + id: 1 + body: { foo: bar } + + - match: { _seq_no: 0 } + + - do: + catch: conflict + delete: + index: test_1 + type: test + id: 1 + if_seq_no: 2 + if_primary_term: 1 + + - do: + delete: + index: test_1 + type: test + id: 1 + if_seq_no: 0 + if_primary_term: 1 + + - match: { _seq_no: 1 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_internal_version_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_internal_version_with_types.yml deleted file mode 100644 index 3d9ddb79366f7..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_internal_version_with_types.yml +++ /dev/null @@ -1,28 +0,0 @@ ---- -"Internal version": - - - do: - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - - - match: { _version: 1} - - - do: - catch: conflict - delete: - index: test_1 - type: test - id: 1 - version: 2 - - - do: - delete: - index: test_1 - type: test - id: 1 - version: 1 - - - match: { _version: 2 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_internal_version.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_internal_version.yml deleted file mode 100644 index adc4f3f4b15c0..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_internal_version.yml +++ /dev/null @@ -1,36 +0,0 @@ ---- -"Internal version": - - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - - do: - index: - index: test_1 - id: 1 - body: { foo: bar } - - match: { _version: 1} - - - do: - index: - index: test_1 - id: 1 - body: { foo: bar } - - match: { _version: 2} - - - do: - catch: conflict - index: - index: test_1 - id: 1 - body: { foo: bar } - version: 1 - - do: - index: - index: test_1 - id: 1 - body: { foo: bar } - version: 2 - - - match: { _version: 3 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/31_internal_version_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/31_internal_version_with_types.yml deleted file mode 100644 index 1767fbebbf966..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/31_internal_version_with_types.yml +++ /dev/null @@ -1,36 +0,0 @@ ---- -"Internal version": - - - do: - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - - match: { _version: 1} - - - do: - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - - match: { _version: 2} - - - do: - catch: conflict - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - version: 1 - - do: - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - version: 2 - - - match: { _version: 3 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/30_internal_version.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/30_internal_version.yml deleted file mode 100644 index 7b474d6bc09dc..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/30_internal_version.yml +++ /dev/null @@ -1,31 +0,0 @@ ---- -"Internal version": - - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - - do: - catch: missing - update: - index: test_1 - id: 1 - version: 1 - body: - doc: { foo: baz } - - - do: - index: - index: test_1 - id: 1 - body: - doc: { foo: baz } - - - do: - catch: conflict - update: - index: test_1 - id: 1 - version: 2 - body: - doc: { foo: baz } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/31_internal_version_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/31_internal_version_with_types.yml deleted file mode 100644 index 17c4806c693ac..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/31_internal_version_with_types.yml +++ /dev/null @@ -1,30 +0,0 @@ ---- -"Internal version": - - - do: - catch: missing - update: - index: test_1 - type: test - id: 1 - version: 1 - body: - doc: { foo: baz } - - - do: - index: - index: test_1 - type: test - id: 1 - body: - doc: { foo: baz } - - - do: - catch: conflict - update: - index: test_1 - type: test - id: 1 - version: 2 - body: - doc: { foo: baz } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_other_versions.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_other_versions.yml deleted file mode 100644 index 9740aa39edeb3..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_other_versions.yml +++ /dev/null @@ -1,29 +0,0 @@ ---- -"Not supported versions": - - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - - do: - catch: /Validation|Invalid/ - update: - index: test_1 - id: 1 - version: 2 - version_type: external - body: - doc: { foo: baz } - upsert: { foo: bar } - - - do: - catch: /Validation|Invalid/ - update: - index: test_1 - id: 1 - version: 2 - version_type: external_gte - body: - doc: { foo: baz } - upsert: { foo: bar } - diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/36_other_versions_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/36_other_versions_with_types.yml deleted file mode 100644 index c0ec082b91a4f..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/36_other_versions_with_types.yml +++ /dev/null @@ -1,27 +0,0 @@ ---- -"Not supported versions": - - - do: - catch: /Validation|Invalid/ - update: - index: test_1 - type: test - id: 1 - version: 2 - version_type: external - body: - doc: { foo: baz } - upsert: { foo: bar } - - - do: - catch: /Validation|Invalid/ - update: - index: test_1 - type: test - id: 1 - version: 2 - version_type: external_gte - body: - doc: { foo: baz } - upsert: { foo: bar } - diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index d8a9a3503a617..373dfaa5c7416 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -257,16 +257,23 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) static ActionRequestValidationException validateSeqNoBasedCASParams( DocWriteRequest request, ActionRequestValidationException validationException) { - if (request.versionType().validateVersionForWrites(request.version()) == false) { - validationException = addValidationError("illegal version value [" + request.version() + "] for version type [" - + request.versionType().name() + "]", validationException); + final long version = request.version(); + final VersionType versionType = request.versionType(); + if (versionType.validateVersionForWrites(version) == false) { + validationException = addValidationError("illegal version value [" + version + "] for version type [" + + versionType.name() + "]", validationException); } - if (request.versionType() == VersionType.FORCE) { + if (versionType == VersionType.FORCE) { validationException = addValidationError("version type [force] may no longer be used", validationException); } + if (versionType == VersionType.INTERNAL && version != Versions.MATCH_ANY && version != Versions.MATCH_DELETED) { + validationException = addValidationError("internal versioning can not be used for optimistic concurrency control. " + + "Please use `if_seq_no` and `if_primary_term` instead", validationException); + } + if (request.ifSeqNo() != UNASSIGNED_SEQ_NO && ( - request.versionType() != VersionType.INTERNAL || request.version() != Versions.MATCH_ANY + versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY )) { validationException = addValidationError("compare and write operations can not use versioning", validationException); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index b5c786ab2df6d..42f569c0a9bda 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -46,9 +46,9 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.rest.action.document.RestBulkAction; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; @@ -501,8 +501,11 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null .create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); } else if ("update".equals(action)) { + if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { + throw new IllegalArgumentException("Update requests do not support versioning. " + + "Please use `if_seq_no` and `if_primary_term` instead"); + } UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict) - .version(version).versionType(versionType) .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) .routing(routing); // EMPTY is safe here because we never call namedObject @@ -516,15 +519,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null } IndexRequest upsertRequest = updateRequest.upsertRequest(); if (upsertRequest != null) { - upsertRequest.version(version); - upsertRequest.versionType(versionType); upsertRequest.setPipeline(defaultPipeline); } - IndexRequest doc = updateRequest.doc(); - if (doc != null) { - doc.version(version); - doc.versionType(versionType); - } internalAdd(updateRequest, payload); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 4e9c598ba9c00..96c93c974cabb 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -224,7 +224,7 @@ public SearchRequestBuilder setVersion(boolean version) { sourceBuilder().version(version); return this; } - + /** * Should each {@link org.elasticsearch.search.SearchHit} be returned with the * sequence number and primary term of the last modification of the document. diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 8cd6146768fff..54cd38aa0b960 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -70,7 +70,7 @@ public UpdateHelper(ScriptService scriptService) { */ public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { final GetResult getResult = indexShard.getService().getForUpdate( - request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm()); + request.type(), request.id(), request.ifSeqNo(), request.ifPrimaryTerm()); return prepare(indexShard.shardId(), request, getResult, nowInMillis); } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index c85f73d90ec3d..3693975ddab08 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -108,8 +108,6 @@ public class UpdateRequest extends InstanceShardOperationRequest private FetchSourceContext fetchSourceContext; - private long version = Versions.MATCH_ANY; - private VersionType versionType = VersionType.INTERNAL; private int retryOnConflict = 0; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; @@ -150,9 +148,6 @@ public UpdateRequest(String index, String type, String id) { @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = super.validate(); - if (version != Versions.MATCH_ANY && upsertRequest != null) { - validationException = addValidationError("can't provide both upsert request and a version", validationException); - } if(upsertRequest != null && upsertRequest.version() != Versions.MATCH_ANY) { validationException = addValidationError("can't provide version in upsert request", validationException); } @@ -163,30 +158,20 @@ public ActionRequestValidationException validate() { validationException = addValidationError("id is missing", validationException); } - if (versionType != VersionType.INTERNAL) { - validationException = addValidationError("version type [" + versionType + "] is not supported by the update API", - validationException); - } else { + validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - if (version != Versions.MATCH_ANY && retryOnConflict > 0) { - validationException = addValidationError("can't provide both retry_on_conflict and a specific version", - validationException); + if (ifSeqNo != UNASSIGNED_SEQ_NO) { + if (retryOnConflict > 0) { + validationException = addValidationError("compare and write operations can not be retried", validationException); } - if (!versionType.validateVersionForWrites(version)) { - validationException = addValidationError("illegal version value [" + version + "] for version type [" + - versionType.name() + "]", validationException); + if (docAsUpsert) { + validationException = addValidationError("compare and write operations can not be used with upsert", validationException); + } + if (upsertRequest != null) { + validationException = + addValidationError("upsert requests don't support `if_seq_no` and `if_primary_term`", validationException); } - } - - validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - - if (ifSeqNo != UNASSIGNED_SEQ_NO && retryOnConflict > 0) { - validationException = addValidationError("compare and write operations can not be retried", validationException); - } - - if (ifSeqNo != UNASSIGNED_SEQ_NO && docAsUpsert) { - validationException = addValidationError("compare and write operations can not be used with upsert", validationException); } if (script == null && doc == null) { @@ -530,24 +515,22 @@ public int retryOnConflict() { @Override public UpdateRequest version(long version) { - this.version = version; - return this; + throw new UnsupportedOperationException("update requests do not support versioning"); } @Override public long version() { - return this.version; + return Versions.MATCH_ANY; } @Override public UpdateRequest versionType(VersionType versionType) { - this.versionType = versionType; - return this; + throw new UnsupportedOperationException("update requests do not support versioning"); } @Override public VersionType versionType() { - return this.versionType; + return VersionType.INTERNAL; } /** @@ -877,8 +860,14 @@ public void readFrom(StreamInput in) throws IOException { upsertRequest.readFrom(in); } docAsUpsert = in.readBoolean(); - version = in.readLong(); - versionType = VersionType.fromValue(in.readByte()); + if (in.getVersion().before(Version.V_7_0_0)) { + long version = in.readLong(); + VersionType versionType = VersionType.readFromStream(in); + if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { + throw new UnsupportedOperationException( + "versioned update requests have been removed in 7.0. Use if_seq_no and if_primary_term"); + } + } ifSeqNo = in.readZLong(); ifPrimaryTerm = in.readVLong(); detectNoop = in.readBoolean(); @@ -930,8 +919,10 @@ public void writeTo(StreamOutput out) throws IOException { upsertRequest.writeTo(out); } out.writeBoolean(docAsUpsert); - out.writeLong(version); - out.writeByte(versionType.getValue()); + if (out.getVersion().before(Version.V_7_0_0)) { + out.writeLong(Versions.MATCH_ANY); + out.writeByte(VersionType.INTERNAL.getValue()); + } out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); out.writeBoolean(detectNoop); diff --git a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 9fb1cb804946f..3c85fe40c5ba7 100644 --- a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; @@ -102,9 +103,9 @@ private GetResult get(String type, String id, String[] gFields, boolean realtime } } - public GetResult getForUpdate(String type, String id, long version, VersionType versionType, long ifSeqNo, long ifPrimaryTerm) { - return get(type, id, new String[]{RoutingFieldMapper.NAME}, true, version, versionType, ifSeqNo, ifPrimaryTerm, - FetchSourceContext.FETCH_SOURCE, true); + public GetResult getForUpdate(String type, String id, long ifSeqNo, long ifPrimaryTerm) { + return get(type, id, new String[]{RoutingFieldMapper.NAME}, true, + Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE, true); } /** diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java index 463a18ea6b802..804fa61fc53b2 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java @@ -83,8 +83,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict())); - updateRequest.version(RestActions.parseVersion(request)); - updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType())); updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo())); updateRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", updateRequest.ifPrimaryTerm())); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 75701e0685290..6d3e4c04c13d7 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -311,7 +311,7 @@ public void testSmileIsSupported() throws IOException { assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } - public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOException { + public void testToValidateUpsertRequestAndCASInBulkRequest() throws IOException { XContentType xContentType = XContentType.SMILE; BytesReference data; try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -321,7 +321,8 @@ public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOExcept builder.field("_index", "index"); builder.field("_type", "type"); builder.field("_id", "id"); - builder.field("version", 1L); + builder.field("if_seq_no", 1L); + builder.field("if_primary_term", 100L); builder.endObject(); builder.endObject(); } @@ -330,7 +331,8 @@ public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOExcept builder.startObject(); builder.startObject("doc").endObject(); Map values = new HashMap<>(); - values.put("version", 2L); + values.put("if_seq_no", 1L); + values.put("if_primary_term", 100L); values.put("_index", "index"); values.put("_type", "type"); builder.field("upsert", values); @@ -341,8 +343,7 @@ public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOExcept } BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(data, null, null, xContentType); - assertThat(bulkRequest.validate().validationErrors(), contains("can't provide both upsert request and a version", - "can't provide version in upsert request")); + assertThat(bulkRequest.validate().validationErrors(), contains("upsert requests don't support `if_seq_no` and `if_primary_term`")); //This test's JSON contains outdated references to types assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java index 277c130cebb1b..f74137d4a418d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -195,8 +196,8 @@ public void testBulkUpdateSimple() throws Exception { assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(4L)); } - public void testBulkVersioning() throws Exception { - createIndex("test"); + public void testBulkWithCAS() throws Exception { + createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build()); ensureGreen(); BulkResponse bulkResponse = client().prepareBulk() .add(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field", "1")) @@ -204,20 +205,22 @@ public void testBulkVersioning() throws Exception { .add(client().prepareIndex("test", "type", "1").setSource("field", "2")).get(); assertEquals(DocWriteResponse.Result.CREATED, bulkResponse.getItems()[0].getResponse().getResult()); - assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L)); + assertThat(bulkResponse.getItems()[0].getResponse().getSeqNo(), equalTo(0L)); assertEquals(DocWriteResponse.Result.CREATED, bulkResponse.getItems()[1].getResponse().getResult()); - assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(1L)); + assertThat(bulkResponse.getItems()[1].getResponse().getSeqNo(), equalTo(1L)); assertEquals(DocWriteResponse.Result.UPDATED, bulkResponse.getItems()[2].getResponse().getResult()); - assertThat(bulkResponse.getItems()[2].getResponse().getVersion(), equalTo(2L)); + assertThat(bulkResponse.getItems()[2].getResponse().getSeqNo(), equalTo(2L)); bulkResponse = client().prepareBulk() - .add(client().prepareUpdate("test", "type", "1").setVersion(4L).setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2")) + .add(client().prepareUpdate("test", "type", "1").setIfSeqNo(40L).setIfPrimaryTerm(20) + .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2")) .add(client().prepareUpdate("test", "type", "2").setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2")) - .add(client().prepareUpdate("test", "type", "1").setVersion(2L).setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3")).get(); + .add(client().prepareUpdate("test", "type", "1").setIfSeqNo(2L).setIfPrimaryTerm(1) + .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3")).get(); assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("version conflict")); - assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L)); - assertThat(bulkResponse.getItems()[2].getResponse().getVersion(), equalTo(3L)); + assertThat(bulkResponse.getItems()[1].getResponse().getSeqNo(), equalTo(3L)); + assertThat(bulkResponse.getItems()[2].getResponse().getSeqNo(), equalTo(4L)); bulkResponse = client().prepareBulk() .add(client().prepareIndex("test", "type", "e1") @@ -237,9 +240,9 @@ public void testBulkVersioning() throws Exception { bulkResponse = client().prepareBulk() .add(client().prepareUpdate("test", "type", "e1") - .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2").setVersion(10)) // INTERNAL + .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2").setIfSeqNo(10L).setIfPrimaryTerm(1)) .add(client().prepareUpdate("test", "type", "e1") - .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3").setVersion(13).setVersionType(VersionType.INTERNAL)) + .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3").setIfSeqNo(20L).setIfPrimaryTerm(1)) .get(); assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("version conflict")); @@ -471,7 +474,7 @@ public void testFailingVersionedUpdatedOnBulk() throws Exception { return; } BulkRequestBuilder requestBuilder = client().prepareBulk(); - requestBuilder.add(client().prepareUpdate("test", "type", "1").setVersion(1) + requestBuilder.add(client().prepareUpdate("test", "type", "1").setIfSeqNo(0L).setIfPrimaryTerm(1) .setDoc(Requests.INDEX_CONTENT_TYPE, "field", threadID)); responses[threadID] = requestBuilder.get(); diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index 5a734352eafb2..642d14e2258cb 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -500,12 +500,14 @@ public void testToAndFromXContent() throws IOException { assertToXContentEquivalent(originalBytes, finalBytes, xContentType); } - public void testToValidateUpsertRequestAndVersion() { + public void testToValidateUpsertRequestAndCAS() { UpdateRequest updateRequest = new UpdateRequest("index", "type", "id"); - updateRequest.version(1L); + updateRequest.setIfSeqNo(1L); + updateRequest.setIfPrimaryTerm(1L); updateRequest.doc("{}", XContentType.JSON); updateRequest.upsert(new IndexRequest("index","type", "id")); - assertThat(updateRequest.validate().validationErrors(), contains("can't provide both upsert request and a version")); + assertThat(updateRequest.validate().validationErrors(), + contains("upsert requests don't support `if_seq_no` and `if_primary_term`")); } public void testToValidateUpsertRequestWithVersion() { diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index 496221ca9fc4e..5492b8bf7c672 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; @@ -31,7 +30,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import static org.elasticsearch.common.lucene.uid.Versions.MATCH_ANY; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -51,8 +49,7 @@ public void testGetForUpdate() throws IOException { recoverShardFromStore(primary); Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); assertTrue(primary.getEngine().refreshNeeded()); - GetResult testGet = primary.getService().getForUpdate( - "test", "0", test.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + GetResult testGet = primary.getService().getForUpdate("test", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}"); try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { @@ -61,8 +58,7 @@ public void testGetForUpdate() throws IOException { Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); assertTrue(primary.getEngine().refreshNeeded()); - GetResult testGet1 = primary.getService().getForUpdate( - "test", "1", test1.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + GetResult testGet1 = primary.getService().getForUpdate("test", "1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); @@ -77,20 +73,19 @@ public void testGetForUpdate() throws IOException { // now again from the reader Engine.IndexResult test2 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); assertTrue(primary.getEngine().refreshNeeded()); - testGet1 = primary.getService().getForUpdate("test", "1", test2.getVersion(), VersionType.INTERNAL, - UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + testGet1 = primary.getService().getForUpdate("test", "1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); final long primaryTerm = primary.getOperationPrimaryTerm(); - testGet1 = primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm); + testGet1 = primary.getService().getForUpdate("test", "1", test2.getSeqNo(), primaryTerm); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); expectThrows(VersionConflictEngineException.class, () -> - primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo() + 1, primaryTerm)); + primary.getService().getForUpdate("test", "1", test2.getSeqNo() + 1, primaryTerm)); expectThrows(VersionConflictEngineException.class, () -> - primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm + 1)); + primary.getService().getForUpdate("test", "1", test2.getSeqNo(), primaryTerm + 1)); closeShards(primary); } @@ -108,16 +103,13 @@ public void testTypelessGetForUpdate() throws IOException { Engine.IndexResult indexResult = indexDoc(shard, "some_type", "0", "{\"foo\" : \"bar\"}"); assertTrue(indexResult.isCreated()); - GetResult getResult = shard.getService().getForUpdate( - "some_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + GetResult getResult = shard.getService().getForUpdate("some_type", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertTrue(getResult.isExists()); - getResult = shard.getService().getForUpdate( - "some_other_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + getResult = shard.getService().getForUpdate("some_other_type", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(getResult.isExists()); - getResult = shard.getService().getForUpdate( - "_doc", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + getResult = shard.getService().getForUpdate("_doc", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertTrue(getResult.isExists()); closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java b/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java index fb3eac28b6793..d749ce367cf0b 100644 --- a/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Setting; @@ -436,17 +437,20 @@ public void testOpenCloseUpdateSettings() throws Exception { public void testEngineGCDeletesSetting() throws InterruptedException { createIndex("test"); - client().prepareIndex("test", "type", "1").setSource("f", 1).get(); // set version to 1 - client().prepareDelete("test", "type", "1").get(); // sets version to 2 - // delete is still in cache this should work & set version to 3 - client().prepareIndex("test", "type", "1").setSource("f", 2).setVersion(2).get(); + client().prepareIndex("test", "type", "1").setSource("f", 1).get(); + DeleteResponse response = client().prepareDelete("test", "type", "1").get(); + long seqNo = response.getSeqNo(); + long primaryTerm = response.getPrimaryTerm(); + // delete is still in cache this should work + client().prepareIndex("test", "type", "1").setSource("f", 2).setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm).get(); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.gc_deletes", 0)).get(); - client().prepareDelete("test", "type", "1").get(); // sets version to 4 + response = client().prepareDelete("test", "type", "1").get(); + seqNo = response.getSeqNo(); Thread.sleep(300); // wait for cache time to change TODO: this needs to be solved better. To be discussed. // delete is should not be in cache - assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3) - .setVersion(4), VersionConflictEngineException.class); + assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3).setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm), + VersionConflictEngineException.class); } diff --git a/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java b/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java index f562ace967820..ffd876383ad9e 100644 --- a/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java +++ b/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java @@ -213,11 +213,11 @@ public void testRequireUnitsOnUpdateSettings() throws Exception { } } - public void testInternalVersioningInitialDelete() throws Exception { + public void testCompareAndSetInitialDelete() throws Exception { createIndex("test"); ensureGreen(); - assertThrows(client().prepareDelete("test", "type", "1").setVersion(17).execute(), + assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(17).setIfPrimaryTerm(10).execute(), VersionConflictEngineException.class); IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1") @@ -225,63 +225,6 @@ public void testInternalVersioningInitialDelete() throws Exception { assertThat(indexResponse.getVersion(), equalTo(1L)); } - public void testInternalVersioning() throws Exception { - createIndex("test"); - ensureGreen(); - - IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); - assertThat(indexResponse.getVersion(), equalTo(1L)); - - indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); - assertThat(indexResponse.getVersion(), equalTo(2L)); - - assertThrows( - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), - VersionConflictEngineException.class); - - assertThrows( - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), - VersionConflictEngineException.class); - - assertThrows( - client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(), - VersionConflictEngineException.class); - - - assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); - assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); - - client().admin().indices().prepareRefresh().execute().actionGet(); - for (int i = 0; i < 10; i++) { - assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(2L)); - } - - // search with versioning - for (int i = 0; i < 10; i++) { - SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).execute().actionGet(); - assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(2L)); - } - - // search without versioning - for (int i = 0; i < 10; i++) { - SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet(); - assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(Versions.NOT_FOUND)); - } - - DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(2).execute().actionGet(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - assertThat(deleteResponse.getVersion(), equalTo(3L)); - - assertThrows(client().prepareDelete("test", "type", "1").setVersion(2).execute(), VersionConflictEngineException.class); - - - // This is intricate - the object was deleted but a delete transaction was with the right version. We add another one - // and thus the transaction is increased. - deleteResponse = client().prepareDelete("test", "type", "1").setVersion(3).execute().actionGet(); - assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult()); - assertThat(deleteResponse.getVersion(), equalTo(4L)); - } - public void testCompareAndSet() { createIndex("test"); ensureGreen(); @@ -290,7 +233,7 @@ public void testCompareAndSet() { assertThat(indexResponse.getSeqNo(), equalTo(0L)); assertThat(indexResponse.getPrimaryTerm(), equalTo(1L)); - indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setIfSeqNo(0L).setIfPrimaryTerm(1).get(); assertThat(indexResponse.getSeqNo(), equalTo(1L)); assertThat(indexResponse.getPrimaryTerm(), equalTo(1L)); @@ -353,25 +296,21 @@ public void testSimpleVersioningWithFlush() throws Exception { createIndex("test"); ensureGreen(); - IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); - assertThat(indexResponse.getVersion(), equalTo(1L)); + IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").get(); + assertThat(indexResponse.getSeqNo(), equalTo(0L)); client().admin().indices().prepareFlush().execute().actionGet(); - indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); - assertThat(indexResponse.getVersion(), equalTo(2L)); + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setIfSeqNo(0).setIfPrimaryTerm(1).get(); + assertThat(indexResponse.getSeqNo(), equalTo(1L)); client().admin().indices().prepareFlush().execute().actionGet(); - assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), - VersionConflictEngineException.class); - - assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), + assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfSeqNo(0).setIfPrimaryTerm(1), VersionConflictEngineException.class); - assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(), + assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1"), VersionConflictEngineException.class); - assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); - assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); + assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(0).setIfPrimaryTerm(1), VersionConflictEngineException.class); for (int i = 0; i < 10; i++) { assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(2L)); @@ -380,10 +319,11 @@ public void testSimpleVersioningWithFlush() throws Exception { client().admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { - SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true). + SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).seqNoAndPrimaryTerm(true). execute().actionGet(); assertHitCount(searchResponse, 1); assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(2L)); + assertThat(searchResponse.getHits().getAt(0).getSeqNo(), equalTo(1L)); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index bfe0cdef41596..ac4f435da130d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -495,7 +495,7 @@ public void onTimeout(TimeValue timeout) { private void clearJobFinishedTime(String jobId, ActionListener listener) { JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); - jobConfigProvider.updateJob(jobId, update, null, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( + jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( job -> listener.onResponse(new AcknowledgedResponse(true)), e -> { logger.error("[" + jobId + "] Failed to clear finished_time", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 443e66b81e785..09a8f219afcf4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -87,7 +87,7 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat CheckedConsumer updateConsumer = ok -> { datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers, - jobConfigProvider::validateDatafeedJob, clusterService.state().nodes().getMinNodeVersion(), + jobConfigProvider::validateDatafeedJob, ActionListener.wrap( updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)), listener::onFailure diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java index d8d5fe216b2a4..fe5ae7eb6e8bf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; @@ -54,7 +53,6 @@ public class TransportUpdateFilterAction extends HandledTransportAction) UpdateFilterAction.Request::new); this.client = client; this.jobManager = jobManager; - this.clusterService = clusterService; } @Override protected void doExecute(Task task, UpdateFilterAction.Request request, ActionListener listener) { - ActionListener filterListener = ActionListener.wrap(filterWithVersion -> { + ActionListener filterListener = ActionListener.wrap(filterWithVersion -> { updateFilter(filterWithVersion, request, listener); }, listener::onFailure); getFilterWithVersion(request.getFilterId(), filterListener); } - private void updateFilter(FilterWithVersion filterWithVersion, UpdateFilterAction.Request request, + private void updateFilter(FilterWithSeqNo filterWithVersion, UpdateFilterAction.Request request, ActionListener listener) { MlFilter filter = filterWithVersion.filter; @@ -100,19 +97,15 @@ private void updateFilter(FilterWithVersion filterWithVersion, UpdateFilterActio MlFilter updatedFilter = MlFilter.builder(filter.getId()).setDescription(description).setItems(items).build(); indexUpdatedFilter( - updatedFilter, filterWithVersion.version, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener); + updatedFilter, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener); } - private void indexUpdatedFilter(MlFilter filter, final long version, final long seqNo, final long primaryTerm, + private void indexUpdatedFilter(MlFilter filter, final long seqNo, final long primaryTerm, UpdateFilterAction.Request request, ActionListener listener) { IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId()); - if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) { - indexRequest.setIfSeqNo(seqNo); - indexRequest.setIfPrimaryTerm(primaryTerm); - } else { - indexRequest.version(version); - } + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { @@ -145,7 +138,7 @@ public void onFailure(Exception e) { }); } - private void getFilterWithVersion(String filterId, ActionListener listener) { + private void getFilterWithVersion(String filterId, ActionListener listener) { GetRequest getRequest = new GetRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, MlFilter.documentId(filterId)); executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener() { @Override @@ -157,7 +150,7 @@ public void onResponse(GetResponse getDocResponse) { XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build(); - listener.onResponse(new FilterWithVersion(filter, getDocResponse)); + listener.onResponse(new FilterWithSeqNo(filter, getDocResponse)); } } else { this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId))); @@ -174,16 +167,14 @@ public void onFailure(Exception e) { }); } - private static class FilterWithVersion { + private static class FilterWithSeqNo { private final MlFilter filter; - private final long version; private final long seqNo; private final long primaryTerm; - private FilterWithVersion(MlFilter filter, GetResponse getDocResponse) { + private FilterWithSeqNo(MlFilter filter, GetResponse getDocResponse) { this.filter = filter; - this.version = getDocResponse.getVersion(); this.seqNo = getDocResponse.getSeqNo(); this.primaryTerm = getDocResponse.getPrimaryTerm(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 7d11173e258b1..7237ab0eb9818 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -8,7 +8,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -264,13 +263,11 @@ public void onFailure(Exception e) { * @param headers Datafeed headers applied with the update * @param validator BiConsumer that accepts the updated config and can perform * extra validations. {@code validator} must call the passed listener - * @param minClusterNodeVersion minimum version of nodes in cluster * @param updatedConfigListener Updated datafeed config listener */ public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map headers, - BiConsumer> validator, - Version minClusterNodeVersion, - ActionListener updatedConfigListener) { + BiConsumer> validator, + ActionListener updatedConfigListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId)); @@ -304,7 +301,7 @@ public void onResponse(GetResponse getResponse) { ActionListener validatedListener = ActionListener.wrap( ok -> { - indexUpdatedConfig(updatedConfig, version, seqNo, primaryTerm, minClusterNodeVersion, ActionListener.wrap( + indexUpdatedConfig(updatedConfig, seqNo, primaryTerm, ActionListener.wrap( indexResponse -> { assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; updatedConfigListener.onResponse(updatedConfig); @@ -324,8 +321,8 @@ public void onFailure(Exception e) { }); } - private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long seqNo, long primaryTerm, - Version minClusterNodeVersion, ActionListener listener) { + private void indexUpdatedConfig(DatafeedConfig updatedConfig, long seqNo, long primaryTerm, + ActionListener listener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), @@ -333,12 +330,8 @@ private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long .setSource(updatedSource) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) { - indexRequest.setIfSeqNo(seqNo); - indexRequest.setIfPrimaryTerm(primaryTerm); - } else { - indexRequest.setVersion(version); - } + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), listener); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 6696bfe1ad96a..ccd0d594eb382 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -333,7 +333,7 @@ public void updateJob(UpdateJobAction.Request request, ActionListener { jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit, - this::validate, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( + this::validate, ActionListener.wrap( updatedJob -> postJobUpdate(request, updatedJob, actionListener), actionListener::onFailure )); @@ -603,7 +603,7 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList .setModelSnapshotId(modelSnapshot.getSnapshotId()) .build(); - jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, clusterService.state().nodes().getMinNodeVersion(), + jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap(job -> { auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index e5ee8855969a3..9423768b8ed4f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -10,7 +10,6 @@ import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -227,11 +226,9 @@ public void onFailure(Exception e) { * @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null} * if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits} * are not changed. - * @param minClusterNodeVersion the minimum version of nodes in the cluster * @param updatedJobListener Updated job listener */ public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, - Version minClusterNodeVersion, ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); @@ -266,7 +263,7 @@ public void onResponse(GetResponse getResponse) { return; } - indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener); + indexUpdatedJob(updatedJob, seqNo, primaryTerm, updatedJobListener); } @Override @@ -287,18 +284,17 @@ public interface UpdateValidator { } /** - * Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, Version, ActionListener)} but + * Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, ActionListener)} but * with an extra validation step which is called before the updated is applied. * * @param jobId The Id of the job to update * @param update The job update * @param maxModelMemoryLimit The maximum model memory allowed * @param validator The job update validator - * @param minClusterNodeVersion the minimum version of a node ifn the cluster * @param updatedJobListener Updated job listener */ public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, - UpdateValidator validator, Version minClusterNodeVersion, ActionListener updatedJobListener) { + UpdateValidator validator, ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); @@ -334,7 +330,7 @@ public void onResponse(GetResponse getResponse) { return; } - indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener); + indexUpdatedJob(updatedJob, seqNo, primaryTerm, updatedJobListener); }, updatedJobListener::onFailure )); @@ -347,7 +343,7 @@ public void onFailure(Exception e) { }); } - private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long primaryTerm, Version minClusterNodeVersion, + private void indexUpdatedJob(Job updatedJob, long seqNo, long primaryTerm, ActionListener updatedJobListener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -355,12 +351,8 @@ private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long prim ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId())) .setSource(updatedSource) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) { - indexRequest.setIfSeqNo(seqNo); - indexRequest.setIfPrimaryTerm(primaryTerm); - } else { - indexRequest.setVersion(version); - } + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), ActionListener.wrap( indexResponse -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index 00d62b7e0a933..9496f4ca0d8f2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -7,7 +7,6 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; @@ -87,7 +86,7 @@ public void testCrud() throws InterruptedException { AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), updateHeaders, - (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), configHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(configHolder.get().getIndices(), equalTo(updateIndices)); @@ -168,7 +167,7 @@ public void testUpdateWhenApplyingTheUpdateThrows() throws Exception { AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), - (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), configHolder, exceptionHolder); assertNull(configHolder.get()); assertNotNull(exceptionHolder.get()); @@ -194,7 +193,7 @@ public void testUpdateWithValidatorFunctionThatErrors() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), - validateErrorFunction, Version.CURRENT, actionListener), + validateErrorFunction, actionListener), configHolder, exceptionHolder); assertNull(configHolder.get()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 3e20bdd73de07..f6ff80edeec02 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -8,7 +8,6 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; @@ -149,7 +148,7 @@ public void testCrud() throws InterruptedException { AtomicReference updateJobResponseHolder = new AtomicReference<>(); blockingCall(actionListener -> jobConfigProvider.updateJob - (jobId, jobUpdate, new ByteSizeValue(32), Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); + (jobId, jobUpdate, new ByteSizeValue(32), actionListener), updateJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertEquals("This job has been updated", updateJobResponseHolder.get().getDescription()); @@ -206,7 +205,7 @@ public void testUpdateWithAValidationError() throws Exception { .build(); AtomicReference updateJobResponseHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), Version.CURRENT, + blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), actionListener), updateJobResponseHolder, exceptionHolder); assertNull(updateJobResponseHolder.get()); assertNotNull(exceptionHolder.get()); @@ -231,7 +230,7 @@ public void testUpdateWithValidator() throws Exception { AtomicReference updateJobResponseHolder = new AtomicReference<>(); // update with the no-op validator blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation( - jobId, jobUpdate, new ByteSizeValue(32), validator, Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); + jobId, jobUpdate, new ByteSizeValue(32), validator, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertNotNull(updateJobResponseHolder.get()); @@ -244,7 +243,7 @@ public void testUpdateWithValidator() throws Exception { updateJobResponseHolder.set(null); // Update with a validator that errors blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation(jobId, jobUpdate, new ByteSizeValue(32), - validatorWithAnError, Version.CURRENT, actionListener), + validatorWithAnError, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(updateJobResponseHolder.get()); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java index 522b1a6d4b97a..8b9fda5b9c3f3 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java @@ -750,12 +750,8 @@ private void innerRefresh(String tokenDocId, Authentication userAuth, ActionList client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId) .setDoc("refresh_token", Collections.singletonMap("refreshed", true)) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); - if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) { - updateRequest.setIfSeqNo(response.getSeqNo()); - updateRequest.setIfPrimaryTerm(response.getPrimaryTerm()); - } else { - updateRequest.setVersion(response.getVersion()); - } + updateRequest.setIfSeqNo(response.getSeqNo()); + updateRequest.setIfPrimaryTerm(response.getPrimaryTerm()); executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(), ActionListener.wrap( updateResponse -> createUserToken(authentication, userAuth, listener, metadata, true), diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml index 74b3a20f5cff8..ebef6c87d7022 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml @@ -115,112 +115,6 @@ setup: } ---- -"Test putting a watch with a redacted password with old version returns an error": - - # version 1 - - do: - xpack.watcher.put_watch: - id: "watch_old_version" - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "pass" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - - # version 2 - - do: - xpack.watcher.put_watch: - id: "watch_old_version" - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "pass" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - - - # using optimistic concurrency control, this one will loose - # as if two users in the watch UI tried to update the same watch - - do: - catch: conflict - xpack.watcher.put_watch: - id: "watch_old_version" - version: 1 - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "::es_redacted::" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - --- "Test putting a watch with a redacted password with old seq no returns an error": - skip: @@ -390,98 +284,3 @@ setup: - match: { hits.hits.0._source.input.http.request.auth.basic.username: "new_user" } - match: { hits.hits.0._source.input.http.request.auth.basic.password: "pass" } ---- -"Test putting a watch with a redacted password with current version works": - - - do: - xpack.watcher.put_watch: - id: "my_watch_with_version" - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "pass" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - - - match: { _id: "my_watch_with_version" } - - match: { _version: 1 } - - # this resembles the exact update from the UI and thus should work, no password change, any change in the watch - # but correct version provided - - do: - xpack.watcher.put_watch: - id: "my_watch_with_version" - version: 1 - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "::es_redacted::" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - - - match: { _id: "my_watch_with_version" } - - match: { _version: 2 } - - - do: - search: - rest_total_hits_as_int: true - index: .watches - body: > - { - "query": { - "term": { - "_id": { - "value": "my_watch_with_version" - } - } - } - } - - - match: { hits.total: 1 } - - match: { hits.hits.0._id: "my_watch_with_version" } - - match: { hits.hits.0._source.input.http.request.auth.basic.password: "pass" } -