2222import org .apache .logging .log4j .LogManager ;
2323import org .apache .logging .log4j .Logger ;
2424import org .elasticsearch .ElasticsearchException ;
25+ import org .elasticsearch .Version ;
2526import org .elasticsearch .action .DocWriteResponse ;
2627import org .elasticsearch .action .delete .DeleteRequest ;
2728import org .elasticsearch .action .index .IndexRequest ;
2829import org .elasticsearch .client .Requests ;
30+ import org .elasticsearch .cluster .service .ClusterService ;
2931import org .elasticsearch .common .Nullable ;
3032import org .elasticsearch .common .bytes .BytesReference ;
3133import org .elasticsearch .common .collect .Tuple ;
4143import org .elasticsearch .index .get .GetResult ;
4244import org .elasticsearch .index .mapper .ParentFieldMapper ;
4345import org .elasticsearch .index .mapper .RoutingFieldMapper ;
46+ import org .elasticsearch .index .seqno .SequenceNumbers ;
4447import org .elasticsearch .index .shard .IndexShard ;
4548import org .elasticsearch .index .shard .ShardId ;
4649import org .elasticsearch .script .Script ;
5255import java .util .ArrayList ;
5356import java .util .HashMap ;
5457import java .util .Map ;
58+ import java .util .function .BooleanSupplier ;
5559import java .util .function .LongSupplier ;
5660
5761/**
@@ -62,15 +66,24 @@ public class UpdateHelper {
6266 private static final Logger logger = LogManager .getLogger (UpdateHelper .class );
6367
6468 private final ScriptService scriptService ;
69+ private final BooleanSupplier canUseIfSeqNo ;
6570
66- public UpdateHelper (ScriptService scriptService ) {
71+ public UpdateHelper (ScriptService scriptService , ClusterService clusterService ) {
72+ this (scriptService , () -> clusterService .state ().nodes ().getMinNodeVersion ().onOrAfter (Version .V_6_6_0 ));
73+ }
74+
75+ UpdateHelper (ScriptService scriptService , BooleanSupplier canUseIfSeqNo ) {
6776 this .scriptService = scriptService ;
77+ this .canUseIfSeqNo = canUseIfSeqNo ;
6878 }
6979
7080 /**
7181 * Prepares an update request by converting it into an index or delete request or an update response (no action).
7282 */
7383 public Result prepare (UpdateRequest request , IndexShard indexShard , LongSupplier nowInMillis ) {
84+ if (canUseIfSeqNo .getAsBoolean () == false ) {
85+ ensureIfSeqNoNotProvided (request .ifSeqNo (), request .ifPrimaryTerm ());
86+ }
7487 final GetResult getResult = indexShard .getService ().getForUpdate (
7588 request .type (), request .id (), request .version (), request .versionType (), request .ifSeqNo (), request .ifPrimaryTerm ());
7689 return prepare (indexShard .shardId (), request , getResult , nowInMillis );
@@ -165,6 +178,19 @@ Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult get
165178 return new Result (indexRequest , DocWriteResponse .Result .CREATED , null , null );
166179 }
167180
181+ /**
182+ * Calculate the version to use for the update request, using either the existing version if internal versioning is used, or the get
183+ * result document's version if the version type is "FORCE".
184+ */
185+ static long calculateUpdateVersion (UpdateRequest request , GetResult getResult ) {
186+ if (request .versionType () != VersionType .INTERNAL ) {
187+ assert request .versionType () == VersionType .FORCE ;
188+ return request .version (); // remember, match_any is excluded by the conflict test
189+ } else {
190+ return getResult .getVersion ();
191+ }
192+ }
193+
168194 /**
169195 * Calculate a routing value to be used, either the included index request's routing, or retrieved document's routing when defined.
170196 */
@@ -219,9 +245,13 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
219245 final IndexRequest finalIndexRequest = Requests .indexRequest (request .index ())
220246 .type (request .type ()).id (request .id ()).routing (routing ).parent (parent )
221247 .source (updatedSourceAsMap , updateSourceContentType )
222- .setIfSeqNo (getResult .getSeqNo ()).setIfPrimaryTerm (getResult .getPrimaryTerm ())
223248 .waitForActiveShards (request .waitForActiveShards ()).timeout (request .timeout ())
224249 .setRefreshPolicy (request .getRefreshPolicy ());
250+ if (canUseIfSeqNo .getAsBoolean ()) {
251+ finalIndexRequest .setIfSeqNo (getResult .getSeqNo ()).setIfPrimaryTerm (getResult .getPrimaryTerm ());
252+ } else {
253+ finalIndexRequest .version (calculateUpdateVersion (request , getResult )).versionType (request .versionType ());
254+ }
225255 return new Result (finalIndexRequest , DocWriteResponse .Result .UPDATED , updatedSourceAsMap , updateSourceContentType );
226256 }
227257 }
@@ -261,16 +291,24 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
261291 final IndexRequest indexRequest = Requests .indexRequest (request .index ())
262292 .type (request .type ()).id (request .id ()).routing (routing ).parent (parent )
263293 .source (updatedSourceAsMap , updateSourceContentType )
264- .setIfSeqNo (getResult .getSeqNo ()).setIfPrimaryTerm (getResult .getPrimaryTerm ())
265294 .waitForActiveShards (request .waitForActiveShards ()).timeout (request .timeout ())
266295 .setRefreshPolicy (request .getRefreshPolicy ());
296+ if (canUseIfSeqNo .getAsBoolean ()) {
297+ indexRequest .setIfSeqNo (getResult .getSeqNo ()).setIfPrimaryTerm (getResult .getPrimaryTerm ());
298+ } else {
299+ indexRequest .version (calculateUpdateVersion (request , getResult )).versionType (request .versionType ());
300+ }
267301 return new Result (indexRequest , DocWriteResponse .Result .UPDATED , updatedSourceAsMap , updateSourceContentType );
268302 case DELETE :
269303 DeleteRequest deleteRequest = Requests .deleteRequest (request .index ())
270304 .type (request .type ()).id (request .id ()).routing (routing ).parent (parent )
271- .setIfSeqNo (getResult .getSeqNo ()).setIfPrimaryTerm (getResult .getPrimaryTerm ())
272305 .waitForActiveShards (request .waitForActiveShards ())
273306 .timeout (request .timeout ()).setRefreshPolicy (request .getRefreshPolicy ());
307+ if (canUseIfSeqNo .getAsBoolean ()) {
308+ deleteRequest .setIfSeqNo (getResult .getSeqNo ()).setIfPrimaryTerm (getResult .getPrimaryTerm ());
309+ } else {
310+ deleteRequest .version (calculateUpdateVersion (request , getResult )).versionType (request .versionType ());
311+ }
274312 return new Result (deleteRequest , DocWriteResponse .Result .DELETED , updatedSourceAsMap , updateSourceContentType );
275313 default :
276314 // If it was neither an INDEX or DELETE operation, treat it as a noop
@@ -354,6 +392,14 @@ public static GetResult extractGetResult(final UpdateRequest request, String con
354392 sourceRequested ? sourceFilteredAsBytes : null , fields );
355393 }
356394
395+ private void ensureIfSeqNoNotProvided (long ifSeqNo , long ifPrimaryTerm ) {
396+ if (ifSeqNo != SequenceNumbers .UNASSIGNED_SEQ_NO || ifPrimaryTerm != SequenceNumbers .UNASSIGNED_PRIMARY_TERM ) {
397+ assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]" ;
398+ throw new IllegalStateException (
399+ "sequence number based compare and write is not supported until all nodes are on version 6.6.0 or higher." );
400+ }
401+ }
402+
357403 public static class Result {
358404
359405 private final Streamable action ;
0 commit comments