|
33 | 33 | import org.elasticsearch.action.delete.DeleteRequest; |
34 | 34 | import org.elasticsearch.action.update.UpdateRequestBuilder; |
35 | 35 | import org.elasticsearch.action.update.UpdateResponse; |
| 36 | +import org.elasticsearch.client.transport.NoNodeAvailableException; |
36 | 37 | import org.elasticsearch.common.inject.Inject; |
37 | 38 | import org.elasticsearch.common.settings.ImmutableSettings; |
38 | 39 | import org.elasticsearch.common.unit.TimeValue; |
@@ -570,7 +571,7 @@ public void stressUpdateDeleteConcurrency() throws Exception { |
570 | 571 |
|
571 | 572 | final int numberOfThreads = scaledRandomIntBetween(3,5); |
572 | 573 | final int numberOfIdsPerThread = scaledRandomIntBetween(3,10); |
573 | | - final int numberOfUpdatesPerId = scaledRandomIntBetween(100,200); |
| 574 | + final int numberOfUpdatesPerId = scaledRandomIntBetween(10,100); |
574 | 575 | final int retryOnConflict = randomIntBetween(0,1); |
575 | 576 | final CountDownLatch latch = new CountDownLatch(numberOfThreads); |
576 | 577 | final CountDownLatch startLatch = new CountDownLatch(1); |
@@ -637,22 +638,49 @@ public void onFailure(Throwable e) { |
637 | 638 | public void run(){ |
638 | 639 | try { |
639 | 640 | startLatch.await(); |
| 641 | + boolean hasWaitedForNoNode = false; |
640 | 642 | for (int j = 0; j < numberOfIds; j++) { |
641 | 643 | for (int k = 0; k < numberOfUpdatesPerId; ++k) { |
642 | 644 | updateRequestsOutstanding.acquire(); |
643 | | - UpdateRequest ur = client().prepareUpdate("test", "type1", Integer.toString(j)) |
644 | | - .setScript("ctx._source.field += 1", ScriptService.ScriptType.INLINE) |
645 | | - .setRetryOnConflict(retryOnConflict) |
646 | | - .setUpsert(jsonBuilder().startObject().field("field", 1).endObject()) |
647 | | - .setListenerThreaded(false) |
648 | | - .request(); |
649 | | - client().update(ur, new UpdateListener(j) ); |
650 | | - |
651 | | - deleteRequestsOutstanding.acquire(); |
652 | | - DeleteRequest dr = client().prepareDelete("test", "type1", Integer.toString(j)) |
653 | | - .setListenerThreaded(false) |
654 | | - .setOperationThreaded(false).request(); |
655 | | - client().delete(dr, new DeleteListener(j)); |
| 645 | + try { |
| 646 | + UpdateRequest ur = client().prepareUpdate("test", "type1", Integer.toString(j)) |
| 647 | + .setScript("ctx._source.field += 1", ScriptService.ScriptType.INLINE) |
| 648 | + .setRetryOnConflict(retryOnConflict) |
| 649 | + .setUpsert(jsonBuilder().startObject().field("field", 1).endObject()) |
| 650 | + .setListenerThreaded(false) |
| 651 | + .request(); |
| 652 | + client().update(ur, new UpdateListener(j)); |
| 653 | + } catch (NoNodeAvailableException nne) { |
| 654 | + updateRequestsOutstanding.release(); |
| 655 | + synchronized (failedMap) { |
| 656 | + incrementMapValue(j, failedMap); |
| 657 | + } |
| 658 | + if (hasWaitedForNoNode) { |
| 659 | + throw nne; |
| 660 | + } |
| 661 | + logger.warn("Got NoNodeException waiting for 1 second for things to recover."); |
| 662 | + hasWaitedForNoNode = true; |
| 663 | + Thread.sleep(1000); |
| 664 | + } |
| 665 | + |
| 666 | + try { |
| 667 | + deleteRequestsOutstanding.acquire(); |
| 668 | + DeleteRequest dr = client().prepareDelete("test", "type1", Integer.toString(j)) |
| 669 | + .setListenerThreaded(false) |
| 670 | + .setOperationThreaded(false).request(); |
| 671 | + client().delete(dr, new DeleteListener(j)); |
| 672 | + } catch (NoNodeAvailableException nne) { |
| 673 | + deleteRequestsOutstanding.release(); |
| 674 | + synchronized (failedMap) { |
| 675 | + incrementMapValue(j, failedMap); |
| 676 | + } |
| 677 | + if (hasWaitedForNoNode) { |
| 678 | + throw nne; |
| 679 | + } |
| 680 | + logger.warn("Got NoNodeException waiting for 1 second for things to recover."); |
| 681 | + hasWaitedForNoNode = true; |
| 682 | + Thread.sleep(1000); //Wait for no-node to clear |
| 683 | + } |
656 | 684 | } |
657 | 685 | } |
658 | 686 | } catch (Throwable e) { |
|
0 commit comments