Skip to content

Commit

Permalink
fix Junit test bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Rookiexu committed Feb 11, 2020
1 parent 23b4dd7 commit b16d950
Showing 1 changed file with 45 additions and 11 deletions.
56 changes: 45 additions & 11 deletions src/test/java/com/lmax/disruptor/RemoveWorkHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class RemoveWorkHandlerTest
public void removeWorkHandlerLostEventExample() throws InterruptedException
{
int eventSize = 8;
Set<Integer> data = initData(0, eventSize);
CountDownLatch countDownLatch = new CountDownLatch(2 * eventSize);
AtomicInteger count = new AtomicInteger();

Expand All @@ -32,7 +31,7 @@ public void removeWorkHandlerLostEventExample() throws InterruptedException

// Build a disruptor and start it.
Disruptor<StubEvent> disruptor = new Disruptor<>(
StubEvent.EVENT_FACTORY, 16, DaemonThreadFactory.INSTANCE);
StubEvent.EVENT_FACTORY, 4, DaemonThreadFactory.INSTANCE);
RingBuffer<StubEvent> ringBuffer = disruptor.start();

// Construct 2 batch event processors.
Expand All @@ -57,13 +56,17 @@ public void removeWorkHandlerLostEventExample() throws InterruptedException
handler2.awaitStart();

//add event
new MessageProducer(disruptor, data).addEvent();
MessageProducer producer1 = new MessageProducer(disruptor, initData(0, eventSize));
new Thread(producer1).start();
producer1.awaitStart();

// use halt remove a processor will lost a event
processor1.halt();

//Make sure new event are added after processor1.halt()
new MessageProducer(disruptor, initData(eventSize, eventSize)).addEvent();
MessageProducer producer2 = new MessageProducer(disruptor, initData(eventSize, eventSize));
new Thread(producer2).start();
producer2.awaitStart();

//waiting remove complete
handler1.awaitShutdown();
Expand Down Expand Up @@ -92,7 +95,7 @@ public void removeWorkHandlerLaterTest() throws InterruptedException

// Build a disruptor and start it.
Disruptor<StubEvent> disruptor = new Disruptor<>(
StubEvent.EVENT_FACTORY, 16, DaemonThreadFactory.INSTANCE);
StubEvent.EVENT_FACTORY, 4, DaemonThreadFactory.INSTANCE);
RingBuffer<StubEvent> ringBuffer = disruptor.start();

// Construct 2 batch event processors.
Expand All @@ -116,14 +119,17 @@ public void removeWorkHandlerLaterTest() throws InterruptedException
handler1.awaitStart();
handler2.awaitStart();

new MessageProducer(disruptor, initData(0, eventSize)).addEvent();
MessageProducer producer1 = new MessageProducer(disruptor, initData(0, eventSize));
new Thread(producer1).start();
producer1.awaitStart();

// haltLater will wait the last event consume
processor1.haltLater();

//Make sure new event are added after processor1.haltLater()
new MessageProducer(disruptor, initData(eventSize, eventSize)).addEvent();

MessageProducer producer2 = new MessageProducer(disruptor, initData(eventSize, eventSize));
new Thread(producer2).start();
producer2.awaitStart();

handler1.awaitShutdown();

Expand All @@ -143,10 +149,11 @@ private Set<Integer> initData(int start, int size)
}
}

class MessageProducer
class MessageProducer implements Runnable
{
Disruptor<StubEvent> disruptor;
Set<Integer> dataSet;
private Disruptor<StubEvent> disruptor;
private Set<Integer> dataSet;
private CountDownLatch startLatch = new CountDownLatch(1);

MessageProducer(Disruptor<StubEvent> disruptor, Set<Integer> dataSet)
{
Expand All @@ -170,6 +177,29 @@ void addEvent()
}
}
}

/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run()
{
startLatch.countDown();
addEvent();
}

public void awaitStart() throws InterruptedException
{
startLatch.await();
}
}

class DynamicHandler implements WorkHandler<StubEvent>, LifecycleAware
Expand Down Expand Up @@ -214,6 +244,10 @@ public void onEvent(StubEvent event) throws Exception
{
countdownlatch.countDown();
completeCount.incrementAndGet();
System.out.println(
"thread id == " + Thread.currentThread().getId() + " ,event ==> " + event.getValue()
);
Thread.yield();
}
}

0 comments on commit b16d950

Please sign in to comment.