@@ -336,14 +336,65 @@ public void testNotificationUsesExecutor() {
336336 };
337337 final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners (shardId , executor , logger );
338338 globalCheckpointListeners .globalCheckpointUpdated (NO_OPS_PERFORMED );
339+ final long globalCheckpoint = randomLongBetween (NO_OPS_PERFORMED , Long .MAX_VALUE );
340+ final AtomicInteger notified = new AtomicInteger ();
339341 final int numberOfListeners = randomIntBetween (0 , 16 );
340342 for (int i = 0 ; i < numberOfListeners ; i ++) {
341- globalCheckpointListeners .add (NO_OPS_PERFORMED , (g , e ) -> {});
343+ globalCheckpointListeners .add (NO_OPS_PERFORMED , (g , e ) -> {
344+ notified .incrementAndGet ();
345+ assertThat (g , equalTo (globalCheckpoint ));
346+ assertNull (e );
347+ });
342348 }
343- globalCheckpointListeners .globalCheckpointUpdated (randomLongBetween (NO_OPS_PERFORMED , Long .MAX_VALUE ));
349+ globalCheckpointListeners .globalCheckpointUpdated (globalCheckpoint );
350+ assertThat (notified .get (), equalTo (numberOfListeners ));
344351 assertThat (count .get (), equalTo (numberOfListeners == 0 ? 0 : 1 ));
345352 }
346353
354+ public void testNotificationOnClosedUsesExecutor () throws IOException {
355+ final AtomicInteger count = new AtomicInteger ();
356+ final Executor executor = command -> {
357+ count .incrementAndGet ();
358+ command .run ();
359+ };
360+ final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners (shardId , executor , logger );
361+ globalCheckpointListeners .close ();
362+ final AtomicInteger notified = new AtomicInteger ();
363+ final int numberOfListeners = randomIntBetween (0 , 16 );
364+ for (int i = 0 ; i < numberOfListeners ; i ++) {
365+ globalCheckpointListeners .add (NO_OPS_PERFORMED , (g , e ) -> {
366+ notified .incrementAndGet ();
367+ assertThat (g , equalTo (UNASSIGNED_SEQ_NO ));
368+ assertNotNull (e );
369+ assertThat (e .getShardId (), equalTo (shardId ));
370+ });
371+ }
372+ assertThat (notified .get (), equalTo (numberOfListeners ));
373+ assertThat (count .get (), equalTo (numberOfListeners ));
374+ }
375+
376+ public void testListenersReadyToBeNotifiedUsesExecutor () {
377+ final AtomicInteger count = new AtomicInteger ();
378+ final Executor executor = command -> {
379+ count .incrementAndGet ();
380+ command .run ();
381+ };
382+ final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners (shardId , executor , logger );
383+ final long globalCheckpoint = randomNonNegativeLong ();
384+ globalCheckpointListeners .globalCheckpointUpdated (globalCheckpoint );
385+ final AtomicInteger notified = new AtomicInteger ();
386+ final int numberOfListeners = randomIntBetween (0 , 16 );
387+ for (int i = 0 ; i < numberOfListeners ; i ++) {
388+ globalCheckpointListeners .add (randomLongBetween (0 , globalCheckpoint ), (g , e ) -> {
389+ notified .incrementAndGet ();
390+ assertThat (g , equalTo (globalCheckpoint ));
391+ assertNull (e );
392+ });
393+ }
394+ assertThat (notified .get (), equalTo (numberOfListeners ));
395+ assertThat (count .get (), equalTo (numberOfListeners ));
396+ }
397+
347398 public void testConcurrency () throws BrokenBarrierException , InterruptedException {
348399 final ExecutorService executor = Executors .newFixedThreadPool (randomIntBetween (1 , 8 ));
349400 final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners (shardId , executor , logger );
0 commit comments