Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
44b87e5
Remove internal versioning as CAS
bleskes Jan 28, 2019
7ef53e9
fix upsert + seq numbers
bleskes Feb 1, 2019
0db0dd9
Merge branch 'master' into cas_remove_internal_versioning
bleskes Feb 2, 2019
16cc5a6
remove version params from the update rest api
bleskes Feb 2, 2019
9c18658
lint
bleskes Feb 2, 2019
11834f8
move ml to seq no cas
bleskes Feb 3, 2019
1622b47
Merge branch 'cas_job_config' into cas_remove_internal_versioning
bleskes Feb 3, 2019
a2bf755
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 3, 2019
d7564a0
fix testEngineGCDeletesSetting
bleskes Feb 3, 2019
a9f47c9
fix SimpleVersioningIT
bleskes Feb 3, 2019
fc97c1a
fix BulkWithUpdatesIT
bleskes Feb 3, 2019
3617f06
move DatafeedConfigProvider to seq No
bleskes Feb 3, 2019
a833522
Merge branch 'cas_job_config' into cas_remove_internal_versioning
bleskes Feb 3, 2019
de81b45
remove watch test with versions
bleskes Feb 3, 2019
575267d
fix testFailingVersionedUpdatedOnBulk
bleskes Feb 4, 2019
0852123
remove "Test putting a watch with a redacted password with current ve…
bleskes Feb 4, 2019
24f92bc
Move TokenService to seqno powered cas
bleskes Feb 4, 2019
820e055
Merge branch 'cas_token_service' into cas_remove_internal_versioning
bleskes Feb 4, 2019
4c00bd6
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 4, 2019
96c9bc1
remove duplicate method
bleskes Feb 4, 2019
03163ce
add docs
bleskes Feb 4, 2019
914da06
force one shard in testBulkWithCAS
bleskes Feb 4, 2019
1b00d76
fix doc reference
bleskes Feb 4, 2019
7974e3c
remove version yml tests for updates
bleskes Feb 4, 2019
d843842
remove version usage in TokenService
bleskes Feb 4, 2019
2690b88
remove version usage in TransportUpdateFilterAction
bleskes Feb 4, 2019
aa3d01b
remove cluster checks and potential usages of versions in ml
bleskes Feb 4, 2019
112af69
merge master
bleskes Feb 4, 2019
2c0ca27
remove duplicates post merge
bleskes Feb 4, 2019
515f95f
add create + update if_seq_no yml tests
bleskes Feb 4, 2019
7124c9f
remove rest internal version tests
bleskes Feb 4, 2019
f2bc6f7
fix documentation + rollback resiliency
bleskes Feb 4, 2019
0dddeb1
fix WatcherRequestConvertersTests.testPutWatch
bleskes Feb 4, 2019
e31f826
E -> e
bleskes Feb 4, 2019
892dead
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 4, 2019
c61a394
fix testUpdateWhileReindexing
bleskes Feb 5, 2019
e8aa73a
fix RequestConvertersTests.testBulk
bleskes Feb 5, 2019
313ec1a
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 5, 2019
a46a763
remove version updates from RequestConvertersTests.testUpdate
bleskes Feb 5, 2019
7e35372
Wire if_seq_no and if_primary_term in rest client bulk
bleskes Feb 5, 2019
2df14d4
wire index and delete
bleskes Feb 5, 2019
949ddb9
feedback
bleskes Feb 5, 2019
621457f
lint
bleskes Feb 5, 2019
74d6a9f
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 5, 2019
99f3fd2
merge cas_bulk_rest
bleskes Feb 5, 2019
6d81066
fix CRUDDocumentationIT
bleskes Feb 5, 2019
57bafed
update requests don't have parameters in url
bleskes Feb 5, 2019
f2a74d3
line lengths, again
bleskes Feb 5, 2019
0239d66
merge cas_bulk_rest
bleskes Feb 5, 2019
208211a
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 5, 2019
2c81f7e
merge master
bleskes Feb 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ static Request putWatch(PutWatchRequest putWatchRequest) {

Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request)
.withVersion(putWatchRequest.getVersion())
.withIfSeqNo(putWatchRequest.ifSeqNo())
.withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm());
if (putWatchRequest.isActive() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;

Expand All @@ -43,11 +42,9 @@ public final class PutWatchRequest implements Validatable {
private final BytesReference source;
private final XContentType xContentType;
private boolean active = true;
private long version = Versions.MATCH_ANY;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;


public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
Objects.requireNonNull(id, "watch id is missing");
if (isValidId(id) == false) {
Expand Down Expand Up @@ -95,14 +92,6 @@ public XContentType xContentType() {
return xContentType;
}

public long getVersion() {
return version;
}

public void setVersion(long version) {
this.version = version;
}

/**
* only performs this put request if the watch's last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.ExecuteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
Expand Down Expand Up @@ -88,9 +88,11 @@ public void testPutWatch() throws Exception {
}

if (randomBoolean()) {
long version = randomLongBetween(10, 100);
putWatchRequest.setVersion(version);
expectedParams.put("version", String.valueOf(version));
long seqNo = randomNonNegativeLong();
long ifPrimaryTerm = randomLongBetween(1, 200);
putWatchRequest.setIfSeqNo(seqNo);
expectedParams.put("if_seq_no", String.valueOf(seqNo));
expectedParams.put("if_primary_term", String.valueOf(ifPrimaryTerm));
}

Request request = WatcherRequestConverters.putWatch(putWatchRequest);
Expand Down
16 changes: 16 additions & 0 deletions docs/reference/migration/migrate_7_0/api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@
[[breaking_70_api_changes]]
=== API changes

[float]
==== Internal Versioning is no longer supported for optimistic concurrency control

Elasticsearch maintains a numeric version field for each document it stores. That field
is incremented by one with every change to the document. Until 7.0.0 the API allowed using
that field for optimistic concurrency control, i.e., making a write operation conditional
on the current document version. Sadly, that approach is flowed because the value of the
version doesn't always uniquely represent a change to the document. If a primary fails
while handling a write operation, it may expose a version that will then be reused by the
new primary.

Due to that issue, internal versioning can no longer be used and is replaced by a new
method based on sequence numbers. See <<optimistic-concurrency-control>> for more details.

Note that the `EXTERNAL` versioning type is still fully supported.

[float]
==== Camel case and underscore parameters deprecated in 6.x have been removed
A number of duplicate parameters deprecated in 6.x have been removed from
Expand Down
28 changes: 14 additions & 14 deletions docs/resiliency/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,6 @@ space. The following issues have been identified:

Other safeguards are tracked in the meta-issue {GIT}11511[#11511].

[float]
=== The _version field may not uniquely identify document content during a network partition (STATUS: ONGOING)

When a primary has been partitioned away from the cluster there is a short period of time until it detects this. During that time it will continue
indexing writes locally, thereby updating document versions. When it tries to replicate the operation, however, it will discover that it is
partitioned away. It won't acknowledge the write and will wait until the partition is resolved to negotiate with the master on how to proceed.
The master will decide to either fail any replicas which failed to index the operations on the primary or tell the primary that it has to
step down because a new primary has been chosen in the meantime. Since the old primary has already written documents, clients may already have read from
the old primary before it shuts itself down. The version numbers of these reads may not be unique if the new primary has already accepted
writes for the same document (see {GIT}19269[#19269]).

We are currently implementing Sequence numbers {GIT}10708[#10708] which better track primary changes. Sequence numbers thus provide a basis
for uniquely identifying writes even in the presence of network partitions and will replace `_version` in operations that require this.

[float]
=== Relocating shards omitted by reporting infrastructure (STATUS: ONGOING)

Expand Down Expand Up @@ -154,6 +140,20 @@ shard.

== Completed

[float]
=== The _version field may not uniquely identify document content during a network partition (STATUS: DONE, v7.0.0)

When a primary has been partitioned away from the cluster there is a short period of time until it detects this. During that time it will continue
indexing writes locally, thereby updating document versions. When it tries to replicate the operation, however, it will discover that it is
partitioned away. It won't acknowledge the write and will wait until the partition is resolved to negotiate with the master on how to proceed.
The master will decide to either fail any replicas which failed to index the operations on the primary or tell the primary that it has to
step down because a new primary has been chosen in the meantime. Since the old primary has already written documents, clients may already have read from
the old primary before it shuts itself down. The version numbers of these reads may not be unique if the new primary has already accepted
writes for the same document (see {GIT}19269[#19269]).

We are currently implementing Sequence numbers {GIT}10708[#10708] which better track primary changes. Sequence numbers thus provide a basis
for uniquely identifying writes even in the presence of network partitions and will replace `_version` in operations that require this.

[float]
=== Repeated network partitions can cause cluster state updates to be lost (STATUS: DONE, v7.0.0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
package org.elasticsearch.index.reindex;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -33,18 +31,10 @@
*/
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {

private final boolean useSeqNoForCAS;

public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
ScriptService scriptService, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
super(task,
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
// all nodes support sequence number powered optimistic concurrency control and we can use it
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
logger, client, threadPool, action, request, listener);
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
ScriptService scriptService, ActionListener<BulkByScrollResponse> listener) {
super(task, false, true, logger, client, threadPool, action, request, listener);
}

@Override
Expand All @@ -60,12 +50,8 @@ protected RequestWrapper<DeleteRequest> buildRequest(ScrollableHitSource.Hit doc
delete.index(doc.getIndex());
delete.type(doc.getType());
delete.id(doc.getId());
if (useSeqNoForCAS) {
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
delete.version(doc.getVersion());
}
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
return wrap(delete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<Bu
ClusterState state = clusterService.state();
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService, state,
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService,
listener).start();
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
Expand Down Expand Up @@ -113,13 +112,8 @@ protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc)
index.type(doc.getType());
index.id(doc.getId());
index.source(doc.getSource(), doc.getXContentType());
if (useSeqNoForCAS) {
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
index.versionType(VersionType.INTERNAL);
index.version(doc.getVersion());
}
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
index.setPipeline(mainRequest.getPipeline());
return wrap(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@
"if_primary_term" : {
"type" : "number",
"description" : "only perform the update operation if the last operation that has changed the document has the specified primary term"
},
"version": {
"type": "number",
"description": "Explicit version number for concurrency control"
},
"version_type": {
"type": "enum",
"options": ["internal", "force"],
"description": "Specific version type"
}
}
},
Expand Down

This file was deleted.

This file was deleted.

17 changes: 12 additions & 5 deletions server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,23 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest<?> request)

static ActionRequestValidationException validateSeqNoBasedCASParams(
DocWriteRequest request, ActionRequestValidationException validationException) {
if (request.versionType().validateVersionForWrites(request.version()) == false) {
validationException = addValidationError("illegal version value [" + request.version() + "] for version type ["
+ request.versionType().name() + "]", validationException);
final long version = request.version();
final VersionType versionType = request.versionType();
if (versionType.validateVersionForWrites(version) == false) {
validationException = addValidationError("illegal version value [" + version + "] for version type ["
+ versionType.name() + "]", validationException);
}
if (request.versionType() == VersionType.FORCE) {
if (versionType == VersionType.FORCE) {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}

if (versionType == VersionType.INTERNAL && version != Versions.MATCH_ANY && version != Versions.MATCH_DELETED) {
validationException = addValidationError("internal versioning can not be used for optimistic concurrency control. " +
"Please use `if_seq_no` and `if_primary_term` instead", validationException);
}

if (request.ifSeqNo() != UNASSIGNED_SEQ_NO && (
request.versionType() != VersionType.INTERNAL || request.version() != Versions.MATCH_ANY
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
Expand Down Expand Up @@ -501,8 +501,11 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException("Update requests do not support versioning. " +
"Please use `if_seq_no` and `if_primary_term` instead");
}
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)
.version(version).versionType(versionType)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.routing(routing);
// EMPTY is safe here because we never call namedObject
Expand All @@ -516,15 +519,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
}
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.version(version);
upsertRequest.versionType(versionType);
upsertRequest.setPipeline(defaultPipeline);
}
IndexRequest doc = updateRequest.doc();
if (doc != null) {
doc.version(version);
doc.versionType(versionType);
}

internalAdd(updateRequest, payload);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public SearchRequestBuilder setVersion(boolean version) {
sourceBuilder().version(version);
return this;
}

/**
* Should each {@link org.elasticsearch.search.SearchHit} be returned with the
* sequence number and primary term of the last modification of the document.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public UpdateHelper(ScriptService scriptService) {
*/
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
final GetResult getResult = indexShard.getService().getForUpdate(
request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm());
request.type(), request.id(), request.ifSeqNo(), request.ifPrimaryTerm());
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
}

Expand Down
Loading