@@ -358,37 +358,35 @@ public void handleException(TransportException exp) {
358358 });
359359 } else {
360360 setPhase (replicationTask , "primary" );
361- createReplicatedOperation (primaryRequest .getRequest (),
362- ActionListener .wrap (result -> result .respond (
363- new ActionListener <Response >() {
364- @ Override
365- public void onResponse (Response response ) {
366- if (syncGlobalCheckpointAfterOperation ) {
367- final IndexShard shard = primaryShardReference .indexShard ;
368- try {
369- shard .maybeSyncGlobalCheckpoint ("post-operation" );
370- } catch (final Exception e ) {
371- // only log non-closed exceptions
372- if (ExceptionsHelper .unwrap (
373- e , AlreadyClosedException .class , IndexShardClosedException .class ) == null ) {
374- // intentionally swallow, a missed global checkpoint sync should not fail this operation
375- logger .info (
376- new ParameterizedMessage (
377- "{} failed to execute post-operation global checkpoint sync" , shard .shardId ()), e );
378- }
379- }
380- }
381- primaryShardReference .close (); // release shard operation lock before responding to caller
382- setPhase (replicationTask , "finished" );
383- onCompletionListener .onResponse (response );
384- }
385361
386- @ Override
387- public void onFailure (Exception e ) {
388- handleException (primaryShardReference , e );
362+ final ActionListener <Response > referenceClosingListener = ActionListener .wrap (response -> {
363+ primaryShardReference .close (); // release shard operation lock before responding to caller
364+ setPhase (replicationTask , "finished" );
365+ onCompletionListener .onResponse (response );
366+ }, e -> handleException (primaryShardReference , e ));
367+
368+ final ActionListener <Response > globalCheckpointSyncingListener = ActionListener .wrap (response -> {
369+ if (syncGlobalCheckpointAfterOperation ) {
370+ final IndexShard shard = primaryShardReference .indexShard ;
371+ try {
372+ shard .maybeSyncGlobalCheckpoint ("post-operation" );
373+ } catch (final Exception e ) {
374+ // only log non-closed exceptions
375+ if (ExceptionsHelper .unwrap (
376+ e , AlreadyClosedException .class , IndexShardClosedException .class ) == null ) {
377+ // intentionally swallow, a missed global checkpoint sync should not fail this operation
378+ logger .info (
379+ new ParameterizedMessage (
380+ "{} failed to execute post-operation global checkpoint sync" , shard .shardId ()), e );
389381 }
390- }), e -> handleException (primaryShardReference , e )
391- ), primaryShardReference ).execute ();
382+ }
383+ }
384+ referenceClosingListener .onResponse (response );
385+ }, referenceClosingListener ::onFailure );
386+
387+ new ReplicationOperation <>(primaryRequest .getRequest (), primaryShardReference ,
388+ ActionListener .wrap (result -> result .respond (globalCheckpointSyncingListener ), referenceClosingListener ::onFailure ),
389+ newReplicasProxy (), logger , actionName , primaryRequest .getPrimaryTerm ()).execute ();
392390 }
393391 } catch (Exception e ) {
394392 handleException (primaryShardReference , e );
@@ -406,12 +404,6 @@ public void onFailure(Exception e) {
406404 onCompletionListener .onFailure (e );
407405 }
408406
409- protected ReplicationOperation <Request , ReplicaRequest , PrimaryResult <ReplicaRequest , Response >> createReplicatedOperation (
410- Request request , ActionListener <PrimaryResult <ReplicaRequest , Response >> listener ,
411- PrimaryShardReference primaryShardReference ) {
412- return new ReplicationOperation <>(request , primaryShardReference , listener ,
413- newReplicasProxy (), logger , actionName , primaryRequest .getPrimaryTerm ());
414- }
415407 }
416408
417409 public static class PrimaryResult <ReplicaRequest extends ReplicationRequest <ReplicaRequest >,
0 commit comments