From 2b4b39ee8a3ab674a7966cadf76c2c0c864cd2f6 Mon Sep 17 00:00:00 2001 From: rookiex <774590465@qq.com> Date: Tue, 19 Nov 2019 10:29:15 +0800 Subject: [PATCH 1/9] dynamic remove handler without message lost --- src/main/java/com/lmax/disruptor/WorkProcessor.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/lmax/disruptor/WorkProcessor.java b/src/main/java/com/lmax/disruptor/WorkProcessor.java index aa8efe117..aa45ccddb 100644 --- a/src/main/java/com/lmax/disruptor/WorkProcessor.java +++ b/src/main/java/com/lmax/disruptor/WorkProcessor.java @@ -88,7 +88,6 @@ public Sequence getSequence() public void halt() { running.set(false); - sequenceBarrier.alert(); } @Override @@ -126,6 +125,7 @@ public void run() // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler + sequenceBarrier.clearAlert(); if (processedSequence) { processedSequence = false; @@ -142,6 +142,9 @@ public void run() event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; + if (!running.get()){ + sequenceBarrier.alert(); + } } else { From 8de6af50890808a4ad499881452b962c6fd2fbe1 Mon Sep 17 00:00:00 2001 From: rookiex <774590465@qq.com> Date: Tue, 19 Nov 2019 10:31:10 +0800 Subject: [PATCH 2/9] dynamic remove handler without message lost --- src/main/java/com/lmax/disruptor/WorkProcessor.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/lmax/disruptor/WorkProcessor.java b/src/main/java/com/lmax/disruptor/WorkProcessor.java index aa45ccddb..e67617f60 100644 --- a/src/main/java/com/lmax/disruptor/WorkProcessor.java +++ b/src/main/java/com/lmax/disruptor/WorkProcessor.java @@ -125,9 +125,13 @@ public void run() // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler - sequenceBarrier.clearAlert(); + if (processedSequence) { + if (!running.get()){ + sequenceBarrier.alert(); + sequenceBarrier.checkAlert(); + } processedSequence = false; do { @@ -142,9 +146,6 @@ public void run() event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; - if (!running.get()){ - sequenceBarrier.alert(); - } } else { From 68b271af48b345a17d730ab6faa0e61eb4cd838a Mon Sep 17 00:00:00 2001 From: rookiex <774590465@qq.com> Date: Tue, 31 Dec 2019 13:04:21 +0800 Subject: [PATCH 3/9] dynamic remove handler without message lost --- src/main/java/com/lmax/disruptor/WorkProcessor.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/com/lmax/disruptor/WorkProcessor.java b/src/main/java/com/lmax/disruptor/WorkProcessor.java index e67617f60..fe058a318 100644 --- a/src/main/java/com/lmax/disruptor/WorkProcessor.java +++ b/src/main/java/com/lmax/disruptor/WorkProcessor.java @@ -86,6 +86,15 @@ public Sequence getSequence() @Override public void halt() + { + running.set(false); + sequenceBarrier.alert(); + } + + /** + * remove workProcessor dynamic without message lost + */ + public void haltLater() { running.set(false); } From d80ce0bf39183c544798ab0a62d1a6eee3e76b35 Mon Sep 17 00:00:00 2001 From: rookiex <774590465@qq.com> Date: Tue, 31 Dec 2019 13:38:51 +0800 Subject: [PATCH 4/9] add example --- .../example/DynamicAddWorkHandler.java | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java diff --git a/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java b/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java new file mode 100644 index 000000000..3c4c6a48e --- /dev/null +++ b/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java @@ -0,0 +1,110 @@ +package com.lmax.disruptor.example; + +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.support.StubEvent; +import com.lmax.disruptor.util.DaemonThreadFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @Author : Rookiex + * @Date : Created in 2019/12/31 13:05 + * @Describe : + * @version: + */ +public class DynamicAddWorkHandler { + + + private static class MessageProduce implements Runnable { + Disruptor disruptor; + + MessageProduce(Disruptor disruptor) { + this.disruptor = disruptor; + } + + + @Override + public void run() { + int msgCount = 10; + for (int i = 0; i < msgCount; i++) { + StubEvent stubEvent = StubEvent.EVENT_FACTORY.newInstance(); + stubEvent.setTestString("msg => " + i); + final int finalI = i; + disruptor.getRingBuffer().publishEvent((a, b, c) -> { + a.setTestString("msg => " + finalI); + }); + } + } + } + + private static class DynamicHandler implements WorkHandler, LifecycleAware { + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + + + @Override + public void onStart() { + + } + + @Override + public void onShutdown() { + shutdownLatch.countDown(); + } + + public void awaitShutdown() throws InterruptedException { + shutdownLatch.await(); + } + + @Override + public void onEvent(StubEvent event) throws Exception { + System.out.println(event.getTestString() + " ==> " + Thread.currentThread().getId()); + } + } + + public static void main(String[] args) throws InterruptedException { + Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); + + // Build a disruptor and start it. + Disruptor disruptor = new Disruptor( + StubEvent.EVENT_FACTORY, 1024, DaemonThreadFactory.INSTANCE); + RingBuffer ringBuffer = disruptor.start(); + + // Construct 2 batch event processors. + DynamicAddWorkHandler.DynamicHandler handler1 = new DynamicAddWorkHandler.DynamicHandler(); + WorkProcessor processor1 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler1, new FatalExceptionHandler(), workSequence); + + DynamicAddWorkHandler.DynamicHandler handler2 = new DynamicAddWorkHandler.DynamicHandler(); + WorkProcessor processor2 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler2, new FatalExceptionHandler(), workSequence); + + // Dynamically add both sequences to the ring buffer + ringBuffer.addGatingSequences(processor1.getSequence()); + // Start the new batch processors. + executor.execute(processor1); + + ringBuffer.addGatingSequences(processor2.getSequence()); + executor.execute(processor2); + + Thread thread1 = new Thread(new MessageProduce(disruptor)); + thread1.start(); + + Thread.sleep(2000); + + // Remove a processor. + // Stop the processor , processor2.haltLater() will wait for all processor2 message processing to complete + processor2.haltLater(); + + Thread thread2 = new Thread(new MessageProduce(disruptor)); + thread2.start(); + + // Wait for shutdown the complete + handler2.awaitShutdown(); + // Remove the gating sequence from the ring buffer + ringBuffer.removeGatingSequence(processor2.getSequence()); + } +} From 63e5a77560f01e885d49976d62a22292ae5f7723 Mon Sep 17 00:00:00 2001 From: rookiex <774590465@qq.com> Date: Tue, 31 Dec 2019 14:08:23 +0800 Subject: [PATCH 5/9] remove lambda --- .../example/DynamicAddWorkHandler.java | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java b/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java index 3c4c6a48e..1cfcecb9e 100644 --- a/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java +++ b/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java @@ -12,30 +12,32 @@ /** * @Author : Rookiex * @Date : Created in 2019/12/31 13:05 - * @Describe : - * @version: + * @Describe : Dynamic WorkHandler */ public class DynamicAddWorkHandler { private static class MessageProduce implements Runnable { Disruptor disruptor; - - MessageProduce(Disruptor disruptor) { + int start; + int over; + MessageProduce(Disruptor disruptor,int start,int over) { this.disruptor = disruptor; + this.start = start; + this.over = over; } - @Override public void run() { - int msgCount = 10; - for (int i = 0; i < msgCount; i++) { - StubEvent stubEvent = StubEvent.EVENT_FACTORY.newInstance(); - stubEvent.setTestString("msg => " + i); - final int finalI = i; - disruptor.getRingBuffer().publishEvent((a, b, c) -> { - a.setTestString("msg => " + finalI); - }); + for (int i = start; i < over + start; i++) { + RingBuffer ringBuffer = disruptor.getRingBuffer(); + long sequence = ringBuffer.next(); + try { + StubEvent event = ringBuffer.get(sequence); + event.setTestString("msg => " + i); + } finally{ + ringBuffer.publish(sequence); + } } } } @@ -54,13 +56,13 @@ public void onShutdown() { shutdownLatch.countDown(); } - public void awaitShutdown() throws InterruptedException { + void awaitShutdown() throws InterruptedException { shutdownLatch.await(); } @Override public void onEvent(StubEvent event) throws Exception { - System.out.println(event.getTestString() + " ==> " + Thread.currentThread().getId()); + System.out.println(event.getTestString() + " ,thread ==> " + Thread.currentThread().getId()); } } @@ -69,7 +71,7 @@ public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); // Build a disruptor and start it. - Disruptor disruptor = new Disruptor( + Disruptor disruptor = new Disruptor<>( StubEvent.EVENT_FACTORY, 1024, DaemonThreadFactory.INSTANCE); RingBuffer ringBuffer = disruptor.start(); @@ -90,7 +92,7 @@ public static void main(String[] args) throws InterruptedException { ringBuffer.addGatingSequences(processor2.getSequence()); executor.execute(processor2); - Thread thread1 = new Thread(new MessageProduce(disruptor)); + Thread thread1 = new Thread(new MessageProduce(disruptor,0,100)); thread1.start(); Thread.sleep(2000); @@ -99,7 +101,7 @@ public static void main(String[] args) throws InterruptedException { // Stop the processor , processor2.haltLater() will wait for all processor2 message processing to complete processor2.haltLater(); - Thread thread2 = new Thread(new MessageProduce(disruptor)); + Thread thread2 = new Thread(new MessageProduce(disruptor,100,200)); thread2.start(); // Wait for shutdown the complete From 52dba206a4f0ad706583b56b793aa220712379e5 Mon Sep 17 00:00:00 2001 From: rookiex <774590465@qq.com> Date: Tue, 11 Feb 2020 20:58:16 +0800 Subject: [PATCH 6/9] JUnit test that shows the code failing and the change fixing the problem --- .../lmax/disruptor/RemoveWorkHandlerTest.java | 219 ++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java diff --git a/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java new file mode 100644 index 000000000..9100abcb0 --- /dev/null +++ b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java @@ -0,0 +1,219 @@ +package com.lmax.disruptor; + +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.support.StubEvent; +import com.lmax.disruptor.util.DaemonThreadFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @Author : Rookiex + * @Date : Created in 2020/2/11 10:45 + * @Describe : + * @version: + */ +public class RemoveWorkHandlerTest +{ + + @Test + public void removeWorkHandlerLostEventExample() throws InterruptedException + { + int eventSize = 8; + Set data = initData(0, eventSize); + CountDownLatch countDownLatch = new CountDownLatch(2 * eventSize); + AtomicInteger count = new AtomicInteger(); + + ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); + Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + + // Build a disruptor and start it. + Disruptor disruptor = new Disruptor<>( + StubEvent.EVENT_FACTORY, 16, DaemonThreadFactory.INSTANCE); + RingBuffer ringBuffer = disruptor.start(); + + // Construct 2 batch event processors. + DynamicHandler handler1 = new DynamicHandler(count, countDownLatch); + WorkProcessor processor1 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler1, new FatalExceptionHandler(), workSequence); + + DynamicHandler handler2 = new DynamicHandler(count, countDownLatch); + WorkProcessor processor2 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler2, new FatalExceptionHandler(), workSequence); + + // Dynamically add both sequences to the ring buffer + ringBuffer.addGatingSequences(processor1.getSequence()); + // Start the new batch processors. + executor.execute(processor1); + + ringBuffer.addGatingSequences(processor2.getSequence()); + executor.execute(processor2); + + //wait handler start + handler1.awaitStart(); + handler2.awaitStart(); + + //add event + new MessageProducer(disruptor, data).addEvent(); + + // use halt remove a processor will lost a event + processor1.halt(); + + //Make sure new event are added after processor1.halt() + new MessageProducer(disruptor, initData(eventSize, eventSize)).addEvent(); + + //waiting remove complete + handler1.awaitShutdown(); + + ringBuffer.removeGatingSequence(processor1.getSequence()); + + //waiting handler consume event(Because there is a event lost, it will be blocked here) + boolean await = countDownLatch.await(3, TimeUnit.SECONDS); + + Assert.assertFalse(await); + long lastCount = countDownLatch.getCount(); + int countValue = count.get(); + Assert.assertEquals(lastCount + countValue, eventSize * 2); + Assert.assertTrue(lastCount > 0); + } + + @Test + public void removeWorkHandlerLaterTest() throws InterruptedException + { + int eventSize = 8; + CountDownLatch countDownLatch = new CountDownLatch(2 * eventSize); + AtomicInteger count = new AtomicInteger(); + + ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); + Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + + // Build a disruptor and start it. + Disruptor disruptor = new Disruptor<>( + StubEvent.EVENT_FACTORY, 16, DaemonThreadFactory.INSTANCE); + RingBuffer ringBuffer = disruptor.start(); + + // Construct 2 batch event processors. + DynamicHandler handler1 = new DynamicHandler(count, countDownLatch); + WorkProcessor processor1 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler1, new FatalExceptionHandler(), workSequence); + + DynamicHandler handler2 = new DynamicHandler(count, countDownLatch); + WorkProcessor processor2 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler2, new FatalExceptionHandler(), workSequence); + + // Dynamically add both sequences to the ring buffer + ringBuffer.addGatingSequences(processor1.getSequence()); + // Start the new batch processors. + executor.execute(processor1); + + ringBuffer.addGatingSequences(processor2.getSequence()); + executor.execute(processor2); + + //wait handler start + handler1.awaitStart(); + handler2.awaitStart(); + + new MessageProducer(disruptor, initData(0, eventSize)).addEvent(); + + // haltLater will wait the last event consume + processor1.haltLater(); + + //Make sure new event are added after processor1.haltLater() + new MessageProducer(disruptor, initData(eventSize, eventSize)).addEvent(); + + + handler1.awaitShutdown(); + + ringBuffer.removeGatingSequence(processor1.getSequence()); + + //waiting handler consume event(Because there is a event lost, it will be blocked here) + Assert.assertTrue(countDownLatch.await(3, TimeUnit.SECONDS)); + } + + private Set initData(int start, int size) + { + Set dataSet = new ConcurrentSkipListSet<>(); + for (int i = start; i < size + start; i++) { + dataSet.add(i); + } + return dataSet; + } +} + +class MessageProducer +{ + Disruptor disruptor; + Set dataSet; + + MessageProducer(Disruptor disruptor, Set dataSet) + { + this.disruptor = disruptor; + this.dataSet = dataSet; + } + + void addEvent() + { + for (int i : dataSet) + { + RingBuffer ringBuffer = disruptor.getRingBuffer(); + long sequence = ringBuffer.next(); + try + { + StubEvent event = ringBuffer.get(sequence); + event.setValue(i); + } finally + { + ringBuffer.publish(sequence); + } + } + } +} + +class DynamicHandler implements WorkHandler, LifecycleAware +{ + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + private final CountDownLatch startLatch = new CountDownLatch(1); + + private AtomicInteger completeCount; + + private CountDownLatch countdownlatch; + + DynamicHandler(AtomicInteger completeCount, CountDownLatch countdownlatch) + { + this.countdownlatch = countdownlatch; + this.completeCount = completeCount; + } + + @Override + public void onStart() + { + startLatch.countDown(); + } + + @Override + public void onShutdown() + { + shutdownLatch.countDown(); + } + + void awaitShutdown() throws InterruptedException + { + shutdownLatch.await(); + } + + void awaitStart() throws InterruptedException + { + startLatch.await(); + } + + @Override + public void onEvent(StubEvent event) throws Exception + { + countdownlatch.countDown(); + completeCount.incrementAndGet(); + } +} + From 23b4dd76fc763c7f275b77a0e6eeb62f2819fa98 Mon Sep 17 00:00:00 2001 From: rookiex <774590465@qq.com> Date: Tue, 11 Feb 2020 21:00:44 +0800 Subject: [PATCH 7/9] Canonical code format --- .../example/DynamicAddWorkHandler.java | 46 +++++++++++++------ 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java b/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java index 1cfcecb9e..f12b52209 100644 --- a/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java +++ b/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java @@ -14,59 +14,73 @@ * @Date : Created in 2019/12/31 13:05 * @Describe : Dynamic WorkHandler */ -public class DynamicAddWorkHandler { +public class DynamicAddWorkHandler +{ - private static class MessageProduce implements Runnable { + private static class MessageProduce implements Runnable + { Disruptor disruptor; int start; int over; - MessageProduce(Disruptor disruptor,int start,int over) { + MessageProduce(Disruptor disruptor,int start,int over) + { this.disruptor = disruptor; this.start = start; this.over = over; } @Override - public void run() { - for (int i = start; i < over + start; i++) { + public void run() + { + for (int i = start; i < over + start; i++) + { RingBuffer ringBuffer = disruptor.getRingBuffer(); long sequence = ringBuffer.next(); - try { + try + { StubEvent event = ringBuffer.get(sequence); event.setTestString("msg => " + i); - } finally{ + } + finally + { ringBuffer.publish(sequence); } } } } - private static class DynamicHandler implements WorkHandler, LifecycleAware { + private static class DynamicHandler implements WorkHandler, LifecycleAware + { private final CountDownLatch shutdownLatch = new CountDownLatch(1); @Override - public void onStart() { + public void onStart() + { } @Override - public void onShutdown() { + public void onShutdown() + { shutdownLatch.countDown(); } - void awaitShutdown() throws InterruptedException { + void awaitShutdown() throws InterruptedException + { shutdownLatch.await(); } @Override - public void onEvent(StubEvent event) throws Exception { + public void onEvent(StubEvent event) throws Exception + { System.out.println(event.getTestString() + " ,thread ==> " + Thread.currentThread().getId()); } } - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws InterruptedException + { Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); @@ -75,6 +89,8 @@ public static void main(String[] args) throws InterruptedException { StubEvent.EVENT_FACTORY, 1024, DaemonThreadFactory.INSTANCE); RingBuffer ringBuffer = disruptor.start(); + SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); + // Construct 2 batch event processors. DynamicAddWorkHandler.DynamicHandler handler1 = new DynamicAddWorkHandler.DynamicHandler(); WorkProcessor processor1 = @@ -95,10 +111,10 @@ public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(new MessageProduce(disruptor,0,100)); thread1.start(); - Thread.sleep(2000); + Thread.sleep(1000); // Remove a processor. - // Stop the processor , processor2.haltLater() will wait for all processor2 message processing to complete + // Stop the processor , processor2.haltLater() will wait for processor2 message processing to complete processor2.haltLater(); Thread thread2 = new Thread(new MessageProduce(disruptor,100,200)); From b16d950c67a94608e0267477f65650cbb05204b4 Mon Sep 17 00:00:00 2001 From: rookiex <774590465@qq.com> Date: Tue, 11 Feb 2020 21:36:46 +0800 Subject: [PATCH 8/9] fix Junit test bug --- .../lmax/disruptor/RemoveWorkHandlerTest.java | 56 +++++++++++++++---- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java index 9100abcb0..45a614597 100644 --- a/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java +++ b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java @@ -23,7 +23,6 @@ public class RemoveWorkHandlerTest public void removeWorkHandlerLostEventExample() throws InterruptedException { int eventSize = 8; - Set data = initData(0, eventSize); CountDownLatch countDownLatch = new CountDownLatch(2 * eventSize); AtomicInteger count = new AtomicInteger(); @@ -32,7 +31,7 @@ public void removeWorkHandlerLostEventExample() throws InterruptedException // Build a disruptor and start it. Disruptor disruptor = new Disruptor<>( - StubEvent.EVENT_FACTORY, 16, DaemonThreadFactory.INSTANCE); + StubEvent.EVENT_FACTORY, 4, DaemonThreadFactory.INSTANCE); RingBuffer ringBuffer = disruptor.start(); // Construct 2 batch event processors. @@ -57,13 +56,17 @@ public void removeWorkHandlerLostEventExample() throws InterruptedException handler2.awaitStart(); //add event - new MessageProducer(disruptor, data).addEvent(); + MessageProducer producer1 = new MessageProducer(disruptor, initData(0, eventSize)); + new Thread(producer1).start(); + producer1.awaitStart(); // use halt remove a processor will lost a event processor1.halt(); //Make sure new event are added after processor1.halt() - new MessageProducer(disruptor, initData(eventSize, eventSize)).addEvent(); + MessageProducer producer2 = new MessageProducer(disruptor, initData(eventSize, eventSize)); + new Thread(producer2).start(); + producer2.awaitStart(); //waiting remove complete handler1.awaitShutdown(); @@ -92,7 +95,7 @@ public void removeWorkHandlerLaterTest() throws InterruptedException // Build a disruptor and start it. Disruptor disruptor = new Disruptor<>( - StubEvent.EVENT_FACTORY, 16, DaemonThreadFactory.INSTANCE); + StubEvent.EVENT_FACTORY, 4, DaemonThreadFactory.INSTANCE); RingBuffer ringBuffer = disruptor.start(); // Construct 2 batch event processors. @@ -116,14 +119,17 @@ public void removeWorkHandlerLaterTest() throws InterruptedException handler1.awaitStart(); handler2.awaitStart(); - new MessageProducer(disruptor, initData(0, eventSize)).addEvent(); + MessageProducer producer1 = new MessageProducer(disruptor, initData(0, eventSize)); + new Thread(producer1).start(); + producer1.awaitStart(); // haltLater will wait the last event consume processor1.haltLater(); //Make sure new event are added after processor1.haltLater() - new MessageProducer(disruptor, initData(eventSize, eventSize)).addEvent(); - + MessageProducer producer2 = new MessageProducer(disruptor, initData(eventSize, eventSize)); + new Thread(producer2).start(); + producer2.awaitStart(); handler1.awaitShutdown(); @@ -143,10 +149,11 @@ private Set initData(int start, int size) } } -class MessageProducer +class MessageProducer implements Runnable { - Disruptor disruptor; - Set dataSet; + private Disruptor disruptor; + private Set dataSet; + private CountDownLatch startLatch = new CountDownLatch(1); MessageProducer(Disruptor disruptor, Set dataSet) { @@ -170,6 +177,29 @@ void addEvent() } } } + + /** + * When an object implementing interface Runnable is used + * to create a thread, starting the thread causes the object's + * run method to be called in that separately executing + * thread. + *

+ * The general contract of the method run is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() + { + startLatch.countDown(); + addEvent(); + } + + public void awaitStart() throws InterruptedException + { + startLatch.await(); + } } class DynamicHandler implements WorkHandler, LifecycleAware @@ -214,6 +244,10 @@ public void onEvent(StubEvent event) throws Exception { countdownlatch.countDown(); completeCount.incrementAndGet(); + System.out.println( + "thread id == " + Thread.currentThread().getId() + " ,event ==> " + event.getValue() + ); + Thread.yield(); } } From 89f3a82fa3deb087ac9c894b9b3142d350273a09 Mon Sep 17 00:00:00 2001 From: rookiex <774590465@qq.com> Date: Tue, 11 Feb 2020 21:45:25 +0800 Subject: [PATCH 9/9] update notes --- src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java index 45a614597..dada3dbee 100644 --- a/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java +++ b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java @@ -135,7 +135,7 @@ public void removeWorkHandlerLaterTest() throws InterruptedException ringBuffer.removeGatingSequence(processor1.getSequence()); - //waiting handler consume event(Because there is a event lost, it will be blocked here) + //waiting handler consume event Assert.assertTrue(countDownLatch.await(3, TimeUnit.SECONDS)); }