5353import org .elasticsearch .xpack .transform .notifications .MockTransformAuditor ;
5454import org .elasticsearch .xpack .transform .notifications .TransformAuditor ;
5555import org .elasticsearch .xpack .transform .persistence .IndexBasedTransformConfigManager ;
56+ import org .elasticsearch .xpack .transform .persistence .SeqNoPrimaryTermAndIndex ;
5657import org .junit .After ;
5758import org .junit .Before ;
5859
6364import java .util .concurrent .CountDownLatch ;
6465import java .util .concurrent .TimeUnit ;
6566import java .util .concurrent .atomic .AtomicBoolean ;
67+ import java .util .concurrent .atomic .AtomicInteger ;
6668import java .util .concurrent .atomic .AtomicReference ;
6769import java .util .function .Consumer ;
6870import java .util .function .Function ;
@@ -90,7 +92,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
9092 private Client client ;
9193 private ThreadPool threadPool ;
9294
93- class MockedTransformIndexer extends TransformIndexer {
95+ static class MockedTransformIndexer extends ClientTransformIndexer {
9496
9597 private final Function <SearchRequest , SearchResponse > searchFunction ;
9698 private final Function <BulkRequest , BulkResponse > bulkFunction ;
@@ -126,14 +128,17 @@ class MockedTransformIndexer extends TransformIndexer {
126128 mock (SchedulerEngine .class )
127129 ),
128130 checkpointProvider ,
129- transformConfig ,
130131 initialState ,
131132 initialPosition ,
133+ mock (Client .class ),
132134 jobStats ,
135+ transformConfig ,
133136 /* TransformProgress */ null ,
134137 TransformCheckpoint .EMPTY ,
135138 TransformCheckpoint .EMPTY ,
136- context
139+ new SeqNoPrimaryTermAndIndex (1 , 1 , "foo" ),
140+ context ,
141+ false
137142 );
138143 this .searchFunction = searchFunction ;
139144 this .bulkFunction = bulkFunction ;
@@ -188,7 +193,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
188193
189194 try {
190195 BulkResponse response = bulkFunction .apply (request );
191- nextPhase . onResponse (response );
196+ super . handleBulkResponse (response , nextPhase );
192197 } catch (Exception e ) {
193198 nextPhase .onFailure (e );
194199 }
@@ -253,7 +258,7 @@ void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse>
253258 }
254259
255260 @ Override
256- void doDeleteByQuery (DeleteByQueryRequest deleteByQueryRequest , ActionListener <BulkByScrollResponse > responseListener ) {
261+ protected void doDeleteByQuery (DeleteByQueryRequest deleteByQueryRequest , ActionListener <BulkByScrollResponse > responseListener ) {
257262 try {
258263 BulkByScrollResponse response = deleteByQueryFunction .apply (deleteByQueryRequest );
259264 responseListener .onResponse (response );
@@ -263,7 +268,7 @@ void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener<B
263268 }
264269
265270 @ Override
266- void refreshDestinationIndex (ActionListener <RefreshResponse > responseListener ) {
271+ protected void refreshDestinationIndex (ActionListener <RefreshResponse > responseListener ) {
267272 responseListener .onResponse (new RefreshResponse (1 , 1 , 0 , Collections .emptyList ()));
268273 }
269274
@@ -705,6 +710,116 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce
705710 assertEquals (1 , context .getFailureCount ());
706711 }
707712
713+ public void testFailureCounterIsResetOnSuccess () throws Exception {
714+ String transformId = randomAlphaOfLength (10 );
715+ TransformConfig config = new TransformConfig (
716+ transformId ,
717+ randomSourceConfig (),
718+ randomDestConfig (),
719+ null ,
720+ null ,
721+ null ,
722+ randomPivotConfig (),
723+ null ,
724+ randomBoolean () ? null : randomAlphaOfLengthBetween (1 , 1000 ),
725+ null ,
726+ null ,
727+ null ,
728+ null
729+ );
730+
731+ final SearchResponse searchResponse = new SearchResponse (
732+ new InternalSearchResponse (
733+ new SearchHits (new SearchHit [] { new SearchHit (1 ) }, new TotalHits (1L , TotalHits .Relation .EQUAL_TO ), 1.0f ),
734+ // Simulate completely null aggs
735+ null ,
736+ new Suggest (Collections .emptyList ()),
737+ new SearchProfileShardResults (Collections .emptyMap ()),
738+ false ,
739+ false ,
740+ 1
741+ ),
742+ "" ,
743+ 1 ,
744+ 1 ,
745+ 0 ,
746+ 0 ,
747+ ShardSearchFailure .EMPTY_ARRAY ,
748+ SearchResponse .Clusters .EMPTY
749+ );
750+
751+ AtomicReference <IndexerState > state = new AtomicReference <>(IndexerState .STOPPED );
752+ Function <SearchRequest , SearchResponse > searchFunction = new Function <>() {
753+ final AtomicInteger calls = new AtomicInteger (0 );
754+
755+ @ Override
756+ public SearchResponse apply (SearchRequest searchRequest ) {
757+ int call = calls .getAndIncrement ();
758+ if (call == 0 ) {
759+ throw new SearchPhaseExecutionException (
760+ "query" ,
761+ "Partial shards failure" ,
762+ new ShardSearchFailure [] { new ShardSearchFailure (new Exception ()) }
763+ );
764+ }
765+ return searchResponse ;
766+ }
767+ };
768+
769+ Function <BulkRequest , BulkResponse > bulkFunction = request -> new BulkResponse (new BulkItemResponse [0 ], 1 );
770+
771+ final AtomicBoolean failIndexerCalled = new AtomicBoolean (false );
772+ final AtomicReference <String > failureMessage = new AtomicReference <>();
773+ Consumer <String > failureConsumer = message -> {
774+ failIndexerCalled .compareAndSet (false , true );
775+ failureMessage .compareAndSet (null , message );
776+ };
777+
778+ MockTransformAuditor auditor = MockTransformAuditor .createMockAuditor ();
779+ TransformContext .Listener contextListener = mock (TransformContext .Listener .class );
780+ TransformContext context = new TransformContext (TransformTaskState .STARTED , "" , 0 , contextListener );
781+
782+ MockedTransformIndexer indexer = createMockIndexer (
783+ config ,
784+ state ,
785+ searchFunction ,
786+ bulkFunction ,
787+ null ,
788+ failureConsumer ,
789+ threadPool ,
790+ ThreadPool .Names .GENERIC ,
791+ auditor ,
792+ context
793+ );
794+
795+ final CountDownLatch latch = indexer .newLatch (1 );
796+
797+ indexer .start ();
798+ assertThat (indexer .getState (), equalTo (IndexerState .STARTED ));
799+ assertTrue (indexer .maybeTriggerAsyncJob (System .currentTimeMillis ()));
800+ assertThat (indexer .getState (), equalTo (IndexerState .INDEXING ));
801+
802+ latch .countDown ();
803+ assertBusy (() -> assertThat (indexer .getState (), equalTo (IndexerState .STARTED )), 10 , TimeUnit .SECONDS );
804+ assertFalse (failIndexerCalled .get ());
805+ assertThat (indexer .getState (), equalTo (IndexerState .STARTED ));
806+ assertEquals (1 , context .getFailureCount ());
807+
808+ final CountDownLatch secondLatch = indexer .newLatch (1 );
809+
810+ indexer .start ();
811+ assertThat (indexer .getState (), equalTo (IndexerState .STARTED ));
812+ assertTrue (indexer .maybeTriggerAsyncJob (System .currentTimeMillis ()));
813+ assertThat (indexer .getState (), equalTo (IndexerState .INDEXING ));
814+
815+ secondLatch .countDown ();
816+ assertBusy (() -> assertThat (indexer .getState (), equalTo (IndexerState .STARTED )), 10 , TimeUnit .SECONDS );
817+ assertFalse (failIndexerCalled .get ());
818+ assertThat (indexer .getState (), equalTo (IndexerState .STARTED ));
819+ auditor .assertAllExpectationsMatched ();
820+ assertEquals (0 , context .getFailureCount ());
821+ }
822+
708823 private MockedTransformIndexer createMockIndexer (
709824 TransformConfig config ,
710825 AtomicReference <IndexerState > state ,
0 commit comments