diff --git a/src/main/java/com/lmax/disruptor/WorkProcessor.java b/src/main/java/com/lmax/disruptor/WorkProcessor.java index aa8efe117..fe058a318 100644 --- a/src/main/java/com/lmax/disruptor/WorkProcessor.java +++ b/src/main/java/com/lmax/disruptor/WorkProcessor.java @@ -91,6 +91,14 @@ public void halt() sequenceBarrier.alert(); } + /** + * remove workProcessor dynamic without message lost + */ + public void haltLater() + { + running.set(false); + } + @Override public boolean isRunning() { @@ -126,8 +134,13 @@ public void run() // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler + if (processedSequence) { + if (!running.get()){ + sequenceBarrier.alert(); + sequenceBarrier.checkAlert(); + } processedSequence = false; do { diff --git a/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java new file mode 100644 index 000000000..dada3dbee --- /dev/null +++ b/src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java @@ -0,0 +1,253 @@ +package com.lmax.disruptor; + +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.support.StubEvent; +import com.lmax.disruptor.util.DaemonThreadFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @Author : Rookiex + * @Date : Created in 2020/2/11 10:45 + * @Describe : + * @version: + */ +public class RemoveWorkHandlerTest +{ + + @Test + public void removeWorkHandlerLostEventExample() throws InterruptedException + { + int eventSize = 8; + CountDownLatch countDownLatch = new CountDownLatch(2 * eventSize); + AtomicInteger count = new AtomicInteger(); + + ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); + Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + + // Build a disruptor and start it. + Disruptor disruptor = new Disruptor<>( + StubEvent.EVENT_FACTORY, 4, DaemonThreadFactory.INSTANCE); + RingBuffer ringBuffer = disruptor.start(); + + // Construct 2 batch event processors. + DynamicHandler handler1 = new DynamicHandler(count, countDownLatch); + WorkProcessor processor1 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler1, new FatalExceptionHandler(), workSequence); + + DynamicHandler handler2 = new DynamicHandler(count, countDownLatch); + WorkProcessor processor2 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler2, new FatalExceptionHandler(), workSequence); + + // Dynamically add both sequences to the ring buffer + ringBuffer.addGatingSequences(processor1.getSequence()); + // Start the new batch processors. + executor.execute(processor1); + + ringBuffer.addGatingSequences(processor2.getSequence()); + executor.execute(processor2); + + //wait handler start + handler1.awaitStart(); + handler2.awaitStart(); + + //add event + MessageProducer producer1 = new MessageProducer(disruptor, initData(0, eventSize)); + new Thread(producer1).start(); + producer1.awaitStart(); + + // use halt remove a processor will lost a event + processor1.halt(); + + //Make sure new event are added after processor1.halt() + MessageProducer producer2 = new MessageProducer(disruptor, initData(eventSize, eventSize)); + new Thread(producer2).start(); + producer2.awaitStart(); + + //waiting remove complete + handler1.awaitShutdown(); + + ringBuffer.removeGatingSequence(processor1.getSequence()); + + //waiting handler consume event(Because there is a event lost, it will be blocked here) + boolean await = countDownLatch.await(3, TimeUnit.SECONDS); + + Assert.assertFalse(await); + long lastCount = countDownLatch.getCount(); + int countValue = count.get(); + Assert.assertEquals(lastCount + countValue, eventSize * 2); + Assert.assertTrue(lastCount > 0); + } + + @Test + public void removeWorkHandlerLaterTest() throws InterruptedException + { + int eventSize = 8; + CountDownLatch countDownLatch = new CountDownLatch(2 * eventSize); + AtomicInteger count = new AtomicInteger(); + + ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); + Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + + // Build a disruptor and start it. + Disruptor disruptor = new Disruptor<>( + StubEvent.EVENT_FACTORY, 4, DaemonThreadFactory.INSTANCE); + RingBuffer ringBuffer = disruptor.start(); + + // Construct 2 batch event processors. + DynamicHandler handler1 = new DynamicHandler(count, countDownLatch); + WorkProcessor processor1 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler1, new FatalExceptionHandler(), workSequence); + + DynamicHandler handler2 = new DynamicHandler(count, countDownLatch); + WorkProcessor processor2 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler2, new FatalExceptionHandler(), workSequence); + + // Dynamically add both sequences to the ring buffer + ringBuffer.addGatingSequences(processor1.getSequence()); + // Start the new batch processors. + executor.execute(processor1); + + ringBuffer.addGatingSequences(processor2.getSequence()); + executor.execute(processor2); + + //wait handler start + handler1.awaitStart(); + handler2.awaitStart(); + + MessageProducer producer1 = new MessageProducer(disruptor, initData(0, eventSize)); + new Thread(producer1).start(); + producer1.awaitStart(); + + // haltLater will wait the last event consume + processor1.haltLater(); + + //Make sure new event are added after processor1.haltLater() + MessageProducer producer2 = new MessageProducer(disruptor, initData(eventSize, eventSize)); + new Thread(producer2).start(); + producer2.awaitStart(); + + handler1.awaitShutdown(); + + ringBuffer.removeGatingSequence(processor1.getSequence()); + + //waiting handler consume event + Assert.assertTrue(countDownLatch.await(3, TimeUnit.SECONDS)); + } + + private Set initData(int start, int size) + { + Set dataSet = new ConcurrentSkipListSet<>(); + for (int i = start; i < size + start; i++) { + dataSet.add(i); + } + return dataSet; + } +} + +class MessageProducer implements Runnable +{ + private Disruptor disruptor; + private Set dataSet; + private CountDownLatch startLatch = new CountDownLatch(1); + + MessageProducer(Disruptor disruptor, Set dataSet) + { + this.disruptor = disruptor; + this.dataSet = dataSet; + } + + void addEvent() + { + for (int i : dataSet) + { + RingBuffer ringBuffer = disruptor.getRingBuffer(); + long sequence = ringBuffer.next(); + try + { + StubEvent event = ringBuffer.get(sequence); + event.setValue(i); + } finally + { + ringBuffer.publish(sequence); + } + } + } + + /** + * When an object implementing interface Runnable is used + * to create a thread, starting the thread causes the object's + * run method to be called in that separately executing + * thread. + *

+ * The general contract of the method run is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() + { + startLatch.countDown(); + addEvent(); + } + + public void awaitStart() throws InterruptedException + { + startLatch.await(); + } +} + +class DynamicHandler implements WorkHandler, LifecycleAware +{ + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + private final CountDownLatch startLatch = new CountDownLatch(1); + + private AtomicInteger completeCount; + + private CountDownLatch countdownlatch; + + DynamicHandler(AtomicInteger completeCount, CountDownLatch countdownlatch) + { + this.countdownlatch = countdownlatch; + this.completeCount = completeCount; + } + + @Override + public void onStart() + { + startLatch.countDown(); + } + + @Override + public void onShutdown() + { + shutdownLatch.countDown(); + } + + void awaitShutdown() throws InterruptedException + { + shutdownLatch.await(); + } + + void awaitStart() throws InterruptedException + { + startLatch.await(); + } + + @Override + public void onEvent(StubEvent event) throws Exception + { + countdownlatch.countDown(); + completeCount.incrementAndGet(); + System.out.println( + "thread id == " + Thread.currentThread().getId() + " ,event ==> " + event.getValue() + ); + Thread.yield(); + } +} + diff --git a/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java b/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java new file mode 100644 index 000000000..f12b52209 --- /dev/null +++ b/src/test/java/com/lmax/disruptor/example/DynamicAddWorkHandler.java @@ -0,0 +1,128 @@ +package com.lmax.disruptor.example; + +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.support.StubEvent; +import com.lmax.disruptor.util.DaemonThreadFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @Author : Rookiex + * @Date : Created in 2019/12/31 13:05 + * @Describe : Dynamic WorkHandler + */ +public class DynamicAddWorkHandler +{ + + + private static class MessageProduce implements Runnable + { + Disruptor disruptor; + int start; + int over; + MessageProduce(Disruptor disruptor,int start,int over) + { + this.disruptor = disruptor; + this.start = start; + this.over = over; + } + + @Override + public void run() + { + for (int i = start; i < over + start; i++) + { + RingBuffer ringBuffer = disruptor.getRingBuffer(); + long sequence = ringBuffer.next(); + try + { + StubEvent event = ringBuffer.get(sequence); + event.setTestString("msg => " + i); + } + finally + { + ringBuffer.publish(sequence); + } + } + } + } + + private static class DynamicHandler implements WorkHandler, LifecycleAware + { + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + + + @Override + public void onStart() + { + + } + + @Override + public void onShutdown() + { + shutdownLatch.countDown(); + } + + void awaitShutdown() throws InterruptedException + { + shutdownLatch.await(); + } + + @Override + public void onEvent(StubEvent event) throws Exception + { + System.out.println(event.getTestString() + " ,thread ==> " + Thread.currentThread().getId()); + } + } + + public static void main(String[] args) throws InterruptedException + { + Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); + ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); + + // Build a disruptor and start it. + Disruptor disruptor = new Disruptor<>( + StubEvent.EVENT_FACTORY, 1024, DaemonThreadFactory.INSTANCE); + RingBuffer ringBuffer = disruptor.start(); + + SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); + + // Construct 2 batch event processors. + DynamicAddWorkHandler.DynamicHandler handler1 = new DynamicAddWorkHandler.DynamicHandler(); + WorkProcessor processor1 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler1, new FatalExceptionHandler(), workSequence); + + DynamicAddWorkHandler.DynamicHandler handler2 = new DynamicAddWorkHandler.DynamicHandler(); + WorkProcessor processor2 = + new WorkProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler2, new FatalExceptionHandler(), workSequence); + + // Dynamically add both sequences to the ring buffer + ringBuffer.addGatingSequences(processor1.getSequence()); + // Start the new batch processors. + executor.execute(processor1); + + ringBuffer.addGatingSequences(processor2.getSequence()); + executor.execute(processor2); + + Thread thread1 = new Thread(new MessageProduce(disruptor,0,100)); + thread1.start(); + + Thread.sleep(1000); + + // Remove a processor. + // Stop the processor , processor2.haltLater() will wait for processor2 message processing to complete + processor2.haltLater(); + + Thread thread2 = new Thread(new MessageProduce(disruptor,100,200)); + thread2.start(); + + // Wait for shutdown the complete + handler2.awaitShutdown(); + // Remove the gating sequence from the ring buffer + ringBuffer.removeGatingSequence(processor2.getSequence()); + } +}