134134import java .util .Collections ;
135135import java .util .Comparator ;
136136import java .util .HashSet ;
137+ import java .util .Iterator ;
137138import java .util .LinkedHashMap ;
138139import java .util .List ;
139140import java .util .Map ;
@@ -1387,17 +1388,17 @@ public void testVersioningCreateExistsException() throws IOException {
13871388
13881389 protected List <Engine .Operation > generateSingleDocHistory (boolean forReplica , VersionType versionType ,
13891390 boolean partialOldPrimary , long primaryTerm ,
1390- int minOpCount , int maxOpCount ) {
1391+ int minOpCount , int maxOpCount , String docId ) {
13911392 final int numOfOps = randomIntBetween (minOpCount , maxOpCount );
13921393 final List <Engine .Operation > ops = new ArrayList <>();
1393- final Term id = newUid ("1" );
1394+ final Term id = newUid (docId );
13941395 final int startWithSeqNo ;
13951396 if (partialOldPrimary ) {
13961397 startWithSeqNo = randomBoolean () ? numOfOps - 1 : randomIntBetween (0 , numOfOps - 1 );
13971398 } else {
13981399 startWithSeqNo = 0 ;
13991400 }
1400- final String valuePrefix = forReplica ? "r_" : "p_" ;
1401+ final String valuePrefix = ( forReplica ? "r_" : "p_" ) + docId + "_ " ;
14011402 final boolean incrementTermWhenIntroducingSeqNo = randomBoolean ();
14021403 for (int i = 0 ; i < numOfOps ; i ++) {
14031404 final Engine .Operation op ;
@@ -1419,7 +1420,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve
14191420 throw new UnsupportedOperationException ("unknown version type: " + versionType );
14201421 }
14211422 if (randomBoolean ()) {
1422- op = new Engine .Index (id , testParsedDocument ("1" , null , testDocumentWithTextField (valuePrefix + i ), B_1 , null ),
1423+ op = new Engine .Index (id , testParsedDocument (docId , null , testDocumentWithTextField (valuePrefix + i ), B_1 , null ),
14231424 forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers .UNASSIGNED_SEQ_NO ,
14241425 forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm ,
14251426 version ,
@@ -1428,7 +1429,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve
14281429 System .currentTimeMillis (), -1 , false
14291430 );
14301431 } else {
1431- op = new Engine .Delete ("test" , "1" , id ,
1432+ op = new Engine .Delete ("test" , docId , id ,
14321433 forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers .UNASSIGNED_SEQ_NO ,
14331434 forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm ,
14341435 version ,
@@ -1443,7 +1444,7 @@ protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, Ve
14431444
14441445 public void testOutOfOrderDocsOnReplica () throws IOException {
14451446 final List <Engine .Operation > ops = generateSingleDocHistory (true ,
1446- randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL , VersionType .EXTERNAL_GTE , VersionType .FORCE ), false , 2 , 2 , 20 );
1447+ randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL , VersionType .EXTERNAL_GTE , VersionType .FORCE ), false , 2 , 2 , 20 , "1" );
14471448 assertOpsOnReplica (ops , replicaEngine , true );
14481449 }
14491450
@@ -1460,7 +1461,8 @@ public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
14601461 try (Store oldReplicaStore = createStore ();
14611462 InternalEngine replicaEngine =
14621463 createEngine (oldSettings , oldReplicaStore , createTempDir ("translog-old-replica" ), newMergePolicy ())) {
1463- final List <Engine .Operation > ops = generateSingleDocHistory (true , randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL ), true , 2 , 2 , 20 );
1464+ final List <Engine .Operation > ops =
1465+ generateSingleDocHistory (true , randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL ), true , 2 , 2 , 20 , "1" );
14641466 assertOpsOnReplica (ops , replicaEngine , true );
14651467 }
14661468 }
@@ -1530,28 +1532,83 @@ private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine repli
15301532 }
15311533 }
15321534
1533- public void testConcurrentOutOfDocsOnReplica () throws IOException , InterruptedException {
1534- final List <Engine .Operation > ops = generateSingleDocHistory (true , randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL ), false , 2 , 100 , 300 );
1535- final Engine .Operation lastOp = ops .get (ops .size () - 1 );
1536- final String lastFieldValue ;
1537- if (lastOp instanceof Engine .Index ) {
1538- Engine .Index index = (Engine .Index ) lastOp ;
1539- lastFieldValue = index .docs ().get (0 ).get ("value" );
1535+ public void testConcurrentOutOfOrderDocsOnReplica () throws IOException , InterruptedException {
1536+ final List <Engine .Operation > opsDoc1 =
1537+ generateSingleDocHistory (true , randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL ), false , 2 , 100 , 300 , "1" );
1538+ final Engine .Operation lastOpDoc1 = opsDoc1 .get (opsDoc1 .size () - 1 );
1539+ final String lastFieldValueDoc1 ;
1540+ if (lastOpDoc1 instanceof Engine .Index ) {
1541+ Engine .Index index = (Engine .Index ) lastOpDoc1 ;
1542+ lastFieldValueDoc1 = index .docs ().get (0 ).get ("value" );
15401543 } else {
15411544 // delete
1542- lastFieldValue = null ;
1545+ lastFieldValueDoc1 = null ;
1546+ }
1547+ final List <Engine .Operation > opsDoc2 =
1548+ generateSingleDocHistory (true , randomFrom (VersionType .INTERNAL , VersionType .EXTERNAL ), false , 2 , 100 , 300 , "2" );
1549+ final Engine .Operation lastOpDoc2 = opsDoc2 .get (opsDoc2 .size () - 1 );
1550+ final String lastFieldValueDoc2 ;
1551+ if (lastOpDoc2 instanceof Engine .Index ) {
1552+ Engine .Index index = (Engine .Index ) lastOpDoc2 ;
1553+ lastFieldValueDoc2 = index .docs ().get (0 ).get ("value" );
1554+ } else {
1555+ // delete
1556+ lastFieldValueDoc2 = null ;
15431557 }
1544- shuffle (ops , random ());
1545- concurrentlyApplyOps (ops , engine );
1558+ // randomly interleave
1559+ final AtomicLong seqNoGenerator = new AtomicLong ();
1560+ Function <Engine .Operation , Engine .Operation > seqNoUpdater = operation -> {
1561+ final long newSeqNo = seqNoGenerator .getAndIncrement ();
1562+ if (operation instanceof Engine .Index ) {
1563+ Engine .Index index = (Engine .Index ) operation ;
1564+ return new Engine .Index (index .uid (), index .parsedDoc (), newSeqNo , index .primaryTerm (), index .version (),
1565+ index .versionType (), index .origin (), index .startTime (), index .getAutoGeneratedIdTimestamp (), index .isRetry ());
1566+ } else {
1567+ Engine .Delete delete = (Engine .Delete ) operation ;
1568+ return new Engine .Delete (delete .type (), delete .id (), delete .uid (), newSeqNo , delete .primaryTerm (),
1569+ delete .version (), delete .versionType (), delete .origin (), delete .startTime ());
1570+ }
1571+ };
1572+ final List <Engine .Operation > allOps = new ArrayList <>();
1573+ Iterator <Engine .Operation > iter1 = opsDoc1 .iterator ();
1574+ Iterator <Engine .Operation > iter2 = opsDoc2 .iterator ();
1575+ while (iter1 .hasNext () && iter2 .hasNext ()) {
1576+ final Engine .Operation next = randomBoolean () ? iter1 .next () : iter2 .next ();
1577+ allOps .add (seqNoUpdater .apply (next ));
1578+ }
1579+ iter1 .forEachRemaining (o -> allOps .add (seqNoUpdater .apply (o )));
1580+ iter2 .forEachRemaining (o -> allOps .add (seqNoUpdater .apply (o )));
1581+ // insert some duplicates
1582+ allOps .addAll (randomSubsetOf (allOps ));
15461583
1547- assertVisibleCount (engine , lastFieldValue == null ? 0 : 1 );
1548- if (lastFieldValue != null ) {
1584+ shuffle (allOps , random ());
1585+ concurrentlyApplyOps (allOps , engine );
1586+
1587+ engine .refresh ("test" );
1588+
1589+ if (lastFieldValueDoc1 != null ) {
15491590 try (Searcher searcher = engine .acquireSearcher ("test" )) {
15501591 final TotalHitCountCollector collector = new TotalHitCountCollector ();
1551- searcher .searcher ().search (new TermQuery (new Term ("value" , lastFieldValue )), collector );
1592+ searcher .searcher ().search (new TermQuery (new Term ("value" , lastFieldValueDoc1 )), collector );
1593+ assertThat (collector .getTotalHits (), equalTo (1 ));
1594+ }
1595+ }
1596+ if (lastFieldValueDoc2 != null ) {
1597+ try (Searcher searcher = engine .acquireSearcher ("test" )) {
1598+ final TotalHitCountCollector collector = new TotalHitCountCollector ();
1599+ searcher .searcher ().search (new TermQuery (new Term ("value" , lastFieldValueDoc2 )), collector );
15521600 assertThat (collector .getTotalHits (), equalTo (1 ));
15531601 }
15541602 }
1603+
1604+ int totalExpectedOps = 0 ;
1605+ if (lastFieldValueDoc1 != null ) {
1606+ totalExpectedOps ++;
1607+ }
1608+ if (lastFieldValueDoc2 != null ) {
1609+ totalExpectedOps ++;
1610+ }
1611+ assertVisibleCount (engine , totalExpectedOps );
15551612 }
15561613
15571614 private void concurrentlyApplyOps (List <Engine .Operation > ops , InternalEngine engine ) throws InterruptedException {
@@ -1591,12 +1648,12 @@ private void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngine eng
15911648 }
15921649
15931650 public void testInternalVersioningOnPrimary () throws IOException {
1594- final List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .INTERNAL , false , 2 , 2 , 20 );
1651+ final List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .INTERNAL , false , 2 , 2 , 20 , "1" );
15951652 assertOpsOnPrimary (ops , Versions .NOT_FOUND , true , engine );
15961653 }
15971654
15981655 public void testVersionOnPrimaryWithConcurrentRefresh () throws Exception {
1599- List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .INTERNAL , false , 2 , 10 , 100 );
1656+ List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .INTERNAL , false , 2 , 10 , 100 , "1" );
16001657 CountDownLatch latch = new CountDownLatch (1 );
16011658 AtomicBoolean running = new AtomicBoolean (true );
16021659 Thread refreshThread = new Thread (() -> {
@@ -1716,7 +1773,7 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
17161773 final Set <VersionType > nonInternalVersioning = new HashSet <>(Arrays .asList (VersionType .values ()));
17171774 nonInternalVersioning .remove (VersionType .INTERNAL );
17181775 final VersionType versionType = randomFrom (nonInternalVersioning );
1719- final List <Engine .Operation > ops = generateSingleDocHistory (false , versionType , false , 2 , 2 , 20 );
1776+ final List <Engine .Operation > ops = generateSingleDocHistory (false , versionType , false , 2 , 2 , 20 , "1" );
17201777 final Engine .Operation lastOp = ops .get (ops .size () - 1 );
17211778 final String lastFieldValue ;
17221779 if (lastOp instanceof Engine .Index ) {
@@ -1794,8 +1851,8 @@ public void testNonInternalVersioningOnPrimary() throws IOException {
17941851 }
17951852
17961853 public void testVersioningPromotedReplica () throws IOException {
1797- final List <Engine .Operation > replicaOps = generateSingleDocHistory (true , VersionType .INTERNAL , false , 1 , 2 , 20 );
1798- List <Engine .Operation > primaryOps = generateSingleDocHistory (false , VersionType .INTERNAL , false , 2 , 2 , 20 );
1854+ final List <Engine .Operation > replicaOps = generateSingleDocHistory (true , VersionType .INTERNAL , false , 1 , 2 , 20 , "1" );
1855+ List <Engine .Operation > primaryOps = generateSingleDocHistory (false , VersionType .INTERNAL , false , 2 , 2 , 20 , "1" );
17991856 Engine .Operation lastReplicaOp = replicaOps .get (replicaOps .size () - 1 );
18001857 final boolean deletedOnReplica = lastReplicaOp instanceof Engine .Delete ;
18011858 final long finalReplicaVersion = lastReplicaOp .version ();
@@ -1815,7 +1872,7 @@ public void testVersioningPromotedReplica() throws IOException {
18151872 }
18161873
18171874 public void testConcurrentExternalVersioningOnPrimary () throws IOException , InterruptedException {
1818- final List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .EXTERNAL , false , 2 , 100 , 300 );
1875+ final List <Engine .Operation > ops = generateSingleDocHistory (false , VersionType .EXTERNAL , false , 2 , 100 , 300 , "1" );
18191876 final Engine .Operation lastOp = ops .get (ops .size () - 1 );
18201877 final String lastFieldValue ;
18211878 if (lastOp instanceof Engine .Index ) {
0 commit comments