Skip to content

Commit 7828e68

Browse files
committed
Fixed bug in WorkerPool where it would consume messages before they were published in the multi-producer scenario
1 parent 481cc58 commit 7828e68

File tree

3 files changed

+84
-8
lines changed

3 files changed

+84
-8
lines changed

src/main/java/com/lmax/disruptor/WorkProcessor.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,12 @@ public void run()
113113
sequence.set(nextSequence - 1L);
114114
}
115115

116-
sequenceBarrier.waitFor(nextSequence);
117-
event = ringBuffer.get(nextSequence);
118-
workHandler.onEvent(event);
119-
120-
processedSequence = true;
116+
if (sequenceBarrier.waitFor(nextSequence) >= nextSequence)
117+
{
118+
event = ringBuffer.get(nextSequence);
119+
workHandler.onEvent(event);
120+
processedSequence = true;
121+
}
121122
}
122123
catch (final AlertException ex)
123124
{
@@ -128,7 +129,7 @@ public void run()
128129
}
129130
catch (final Throwable ex)
130131
{
131-
// handle, mark as procesed, unless the exception handler threw an exception
132+
// handle, mark as processed, unless the exception handler threw an exception
132133
exceptionHandler.handleEventException(ex, nextSequence, event);
133134
processedSequence = true;
134135
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.lmax.disruptor;
2+
3+
import static org.hamcrest.CoreMatchers.is;
4+
import static org.junit.Assert.assertThat;
5+
6+
import java.util.concurrent.Executor;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.atomic.AtomicLong;
9+
10+
import org.junit.Test;
11+
12+
import com.lmax.disruptor.util.DaemonThreadFactory;
13+
14+
15+
public class WorkerPoolTest
16+
{
17+
@SuppressWarnings("unchecked")
18+
@Test
19+
public void shouldProcessEachMessageByOnlyOneWorker() throws Exception
20+
{
21+
Executor executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
22+
WorkerPool<AtomicLong> pool = new WorkerPool<AtomicLong>(new AtomicLongEventFactory(), new FatalExceptionHandler(),
23+
new AtomicLongWorkHandler(), new AtomicLongWorkHandler());
24+
25+
RingBuffer<AtomicLong> ringBuffer = pool.start(executor);
26+
27+
ringBuffer.next();
28+
ringBuffer.next();
29+
ringBuffer.publish(0);
30+
ringBuffer.publish(1);
31+
32+
Thread.sleep(500);
33+
34+
assertThat(ringBuffer.get(0).get(), is(1L));
35+
assertThat(ringBuffer.get(1).get(), is(1L));
36+
}
37+
38+
@SuppressWarnings("unchecked")
39+
@Test
40+
public void shouldProcessOnlyOnceItHasBeenPublished() throws Exception
41+
{
42+
Executor executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
43+
WorkerPool<AtomicLong> pool = new WorkerPool<AtomicLong>(new AtomicLongEventFactory(), new FatalExceptionHandler(),
44+
new AtomicLongWorkHandler(), new AtomicLongWorkHandler());
45+
46+
RingBuffer<AtomicLong> ringBuffer = pool.start(executor);
47+
48+
ringBuffer.next();
49+
ringBuffer.next();
50+
51+
Thread.sleep(1000);
52+
53+
assertThat(ringBuffer.get(0).get(), is(0L));
54+
assertThat(ringBuffer.get(1).get(), is(0L));
55+
}
56+
57+
private static class AtomicLongWorkHandler implements WorkHandler<AtomicLong>
58+
{
59+
@Override
60+
public void onEvent(AtomicLong event) throws Exception
61+
{
62+
event.incrementAndGet();
63+
}
64+
}
65+
66+
67+
private static class AtomicLongEventFactory implements EventFactory<AtomicLong>
68+
{
69+
@Override
70+
public AtomicLong newInstance()
71+
{
72+
return new AtomicLong(0);
73+
}
74+
}
75+
}

src/test/java/com/lmax/disruptor/dsl/DisruptorTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public class DisruptorTest
6262
private static final int TIMEOUT_IN_SECONDS = 2;
6363
private Disruptor<TestEvent> disruptor;
6464
private StubExecutor executor;
65-
private Collection<DelayedEventHandler> delayedEventHandlers = new ArrayList<DelayedEventHandler>();
66-
private Collection<TestWorkHandler> testWorkHandlers = new ArrayList<TestWorkHandler>();
65+
private final Collection<DelayedEventHandler> delayedEventHandlers = new ArrayList<DelayedEventHandler>();
66+
private final Collection<TestWorkHandler> testWorkHandlers = new ArrayList<TestWorkHandler>();
6767
private RingBuffer<TestEvent> ringBuffer;
6868
private TestEvent lastPublishedEvent;
6969

0 commit comments

Comments
 (0)