Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
*/
package org.elasticsearch.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;

Expand Down Expand Up @@ -253,15 +254,9 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) thr
}
}

static void logDeprecationWarnings(DocWriteRequest request, DeprecationLogger logger) {
if (request.versionType() == VersionType.INTERNAL &&
request.version() != Versions.MATCH_ANY &&
request.version() != Versions.MATCH_DELETED) {
logger.deprecatedAndMaybeLog("occ_internal_version",
"Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. Please use" +
" the `if_seq_no` and `if_primary_term` parameters instead. (request for index [{}], type [{}], id [{}])",
request.index(), request.type(), request.id());
}
/** Tests if the cluster is ready for compare and write using sequence numbers. */
static boolean canUseIfSeqNo(ClusterState clusterState) {
return clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_6_0);
}

static ActionRequestValidationException validateSeqNoBasedCASParams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
Expand Down Expand Up @@ -81,6 +84,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
public static final String ACTION_NAME = BulkAction.NAME + "[s]";

private static final Logger logger = LogManager.getLogger(TransportShardBulkAction.class);
private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(logger);

private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
Expand Down Expand Up @@ -136,35 +140,37 @@ public void onTimeout(TimeValue timeout) {
});
waitingFuture.get();
};
return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis,
new ConcreteMappingUpdatePerformer(), waitForMappingUpdate);
return performOnPrimary(request, primary, updateHelper, DocWriteRequest.canUseIfSeqNo(clusterService.state()),
threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer(), waitForMappingUpdate);
}

public static WritePrimaryResult<BulkShardRequest, BulkShardResponse> performOnPrimary(
BulkShardRequest request,
IndexShard primary,
UpdateHelper updateHelper,
boolean canUseIfSeqNo,
LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater,
CheckedRunnable<Exception> waitForMappingUpdate) throws Exception {
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);
return performOnPrimary(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate);
return performOnPrimary(context, updateHelper, canUseIfSeqNo, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate);
}

private static WritePrimaryResult<BulkShardRequest, BulkShardResponse> performOnPrimary(
BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
BulkPrimaryExecutionContext context, UpdateHelper updateHelper, boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater, CheckedRunnable<Exception> waitForMappingUpdate) throws Exception {

while (context.hasMoreOperationsToExecute()) {
executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate);
executeBulkItemRequest(context, updateHelper, canUseIfSeqNo, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate);
assert context.isInitial(); // either completed and moved to next or reset
}
return new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(),
null, context.getPrimary(), logger);
}

/** Executes bulk item requests and handles request execution exceptions */
static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper,
boolean canUseIfSeqNo, LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater, CheckedRunnable<Exception> waitForMappingUpdate)
throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();
Expand All @@ -173,7 +179,7 @@ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHe
if (opType == DocWriteRequest.OpType.UPDATE) {
final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
try {
updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), canUseIfSeqNo, nowInMillisSupplier);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
Expand Down Expand Up @@ -209,7 +215,7 @@ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHe
}

assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state

validateDocWriteRequest(context.getRequestToExecute(), canUseIfSeqNo);
if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) {
executeDeleteRequestOnPrimary(context, mappingUpdater);
} else {
Expand Down Expand Up @@ -501,4 +507,24 @@ public void updateMappings(final Mapping update, final ShardId shardId, final St
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update);
}
}

private static void validateDocWriteRequest(DocWriteRequest<?> request, boolean canUseIfSeqNo) {
if (canUseIfSeqNo) {
if (request.versionType() == VersionType.INTERNAL
&& request.version() != Versions.MATCH_ANY
&& request.version() != Versions.MATCH_DELETED) {
DEPRECATION_LOGGER.deprecatedAndMaybeLog("occ_internal_version",
"Usage of internal versioning for optimistic concurrency control is deprecated and will be removed. Please use" +
" the `if_seq_no` and `if_primary_term` parameters instead. (request for index [{}], type [{}], id [{}])",
request.index(), request.type(), request.id());
}
} else {
if (request.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
|| request.ifPrimaryTerm() != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
assert false : "ifSeqNo [" + request.ifSeqNo() + "], ifPrimaryTerm [" + request.ifPrimaryTerm() + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.delete;

import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
Expand All @@ -29,7 +28,6 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -53,7 +51,6 @@
*/
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest {
private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(LogManager.getLogger(DeleteRequest.class));

private String type;
private String id;
Expand Down Expand Up @@ -102,8 +99,6 @@ public ActionRequestValidationException validate() {

validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);

DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER);

return validationException;
}

Expand Down Expand Up @@ -286,7 +281,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
} else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
assert false : "ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " +
"Stream version [" + out.getVersion() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.index;

import org.apache.logging.log4j.LogManager;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
Expand All @@ -37,7 +36,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -75,7 +73,6 @@
* @see org.elasticsearch.client.Client#index(IndexRequest)
*/
public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implements DocWriteRequest<IndexRequest>, CompositeIndicesRequest {
private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(LogManager.getLogger(IndexRequest.class));

/**
* Max length of the source document to include into string()
Expand Down Expand Up @@ -200,9 +197,6 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}


DocWriteRequest.logDeprecationWarnings(this, DEPRECATION_LOGGER);

return validationException;
}

Expand Down Expand Up @@ -654,7 +648,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
} else if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
assert false : "ifSeqNo [" + ifSeqNo + "], ifPrimaryTerm [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher. " +
"Stream version [" + out.getVersion() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
Expand Down Expand Up @@ -174,7 +175,8 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
final ShardId shardId = request.getShardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.getId());
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis);
final UpdateHelper.Result result = updateHelper.prepare(
request, indexShard, DocWriteRequest.canUseIfSeqNo(clusterService.state()), threadPool::absoluteTimeInMillis);
switch (result.getResponseResult()) {
case CREATED:
IndexRequest upsertRequest = result.action();
Expand Down
Loading