diff --git a/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java index 9100abcb0..45a614597 100644 --- a/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java +++ b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java @@ -23,7 +23,6 @@ public class RemoveWorkHandlerTest public void removeWorkHandlerLostEventExample() throws InterruptedException { int eventSize = 8; - Set data = initData(0, eventSize); CountDownLatch countDownLatch = new CountDownLatch(2 * eventSize); AtomicInteger count = new AtomicInteger(); @@ -32,7 +31,7 @@ public void removeWorkHandlerLostEventExample() throws InterruptedException // Build a disruptor and start it. Disruptor disruptor = new Disruptor<>( - StubEvent.EVENT_FACTORY, 16, DaemonThreadFactory.INSTANCE); + StubEvent.EVENT_FACTORY, 4, DaemonThreadFactory.INSTANCE); RingBuffer ringBuffer = disruptor.start(); // Construct 2 batch event processors. @@ -57,13 +56,17 @@ public void removeWorkHandlerLostEventExample() throws InterruptedException handler2.awaitStart(); //add event - new MessageProducer(disruptor, data).addEvent(); + MessageProducer producer1 = new MessageProducer(disruptor, initData(0, eventSize)); + new Thread(producer1).start(); + producer1.awaitStart(); // use halt remove a processor will lost a event processor1.halt(); //Make sure new event are added after processor1.halt() - new MessageProducer(disruptor, initData(eventSize, eventSize)).addEvent(); + MessageProducer producer2 = new MessageProducer(disruptor, initData(eventSize, eventSize)); + new Thread(producer2).start(); + producer2.awaitStart(); //waiting remove complete handler1.awaitShutdown(); @@ -92,7 +95,7 @@ public void removeWorkHandlerLaterTest() throws InterruptedException // Build a disruptor and start it. Disruptor disruptor = new Disruptor<>( - StubEvent.EVENT_FACTORY, 16, DaemonThreadFactory.INSTANCE); + StubEvent.EVENT_FACTORY, 4, DaemonThreadFactory.INSTANCE); RingBuffer ringBuffer = disruptor.start(); // Construct 2 batch event processors. @@ -116,14 +119,17 @@ public void removeWorkHandlerLaterTest() throws InterruptedException handler1.awaitStart(); handler2.awaitStart(); - new MessageProducer(disruptor, initData(0, eventSize)).addEvent(); + MessageProducer producer1 = new MessageProducer(disruptor, initData(0, eventSize)); + new Thread(producer1).start(); + producer1.awaitStart(); // haltLater will wait the last event consume processor1.haltLater(); //Make sure new event are added after processor1.haltLater() - new MessageProducer(disruptor, initData(eventSize, eventSize)).addEvent(); - + MessageProducer producer2 = new MessageProducer(disruptor, initData(eventSize, eventSize)); + new Thread(producer2).start(); + producer2.awaitStart(); handler1.awaitShutdown(); @@ -143,10 +149,11 @@ private Set initData(int start, int size) } } -class MessageProducer +class MessageProducer implements Runnable { - Disruptor disruptor; - Set dataSet; + private Disruptor disruptor; + private Set dataSet; + private CountDownLatch startLatch = new CountDownLatch(1); MessageProducer(Disruptor disruptor, Set dataSet) { @@ -170,6 +177,29 @@ void addEvent() } } } + + /** + * When an object implementing interface Runnable is used + * to create a thread, starting the thread causes the object's + * run method to be called in that separately executing + * thread. + *

+ * The general contract of the method run is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() + { + startLatch.countDown(); + addEvent(); + } + + public void awaitStart() throws InterruptedException + { + startLatch.await(); + } } class DynamicHandler implements WorkHandler, LifecycleAware @@ -214,6 +244,10 @@ public void onEvent(StubEvent event) throws Exception { countdownlatch.countDown(); completeCount.incrementAndGet(); + System.out.println( + "thread id == " + Thread.currentThread().getId() + " ,event ==> " + event.getValue() + ); + Thread.yield(); } }