Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,15 @@ public List<E> getAtMost(String keyName, int num) throws IOException,
if (numToFill > 0) {
refiller.fillQueueForKey(keyName, ekvs, numToFill);
}
// Asynch task to fill > lowWatermark
if (i <= (int) (lowWatermark * numValues)) {
submitRefillTask(keyName, keyQueue);
}
return ekvs;

break;
} else {
ekvs.add(val);
}
ekvs.add(val);
}
// Schedule a refill task in case queue has gone below the watermark
if (keyQueue.size() < (int) (lowWatermark * numValues)) {
submitRefillTask(keyName, keyQueue);
}
} catch (Exception e) {
throw new IOException("Exception while contacting value generator ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand All @@ -31,7 +32,6 @@
import org.junit.Assert;
import org.junit.Test;

import com.google.common.base.Supplier;
import com.google.common.collect.Sets;

public class TestValueQueue {
Expand Down Expand Up @@ -62,14 +62,26 @@ public FillInfo getTop() throws InterruptedException {
}
}

private void waitForRefill(ValueQueue<?> valueQueue, String queueName, int queueSize)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> {
int size = valueQueue.getSize(queueName);
if (size != queueSize) {
LOG.info("Current ValueQueue size is " + size);
return false;
}
return true;
}, 100, 3000);
}

/**
* Verifies that Queue is initially filled to "numInitValues"
*/
@Test(timeout=30000)
public void testInitFill() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 300, 1,
new ValueQueue<String>(10, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num);
Expand All @@ -83,7 +95,7 @@ public void testInitFill() throws Exception {
public void testWarmUp() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.5f, 300, 1,
new ValueQueue<String>(10, 0.5f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
vq.initializeQueuesForKeys("k1", "k2", "k3");
FillInfo[] fillInfos =
Expand All @@ -106,14 +118,17 @@ public void testWarmUp() throws Exception {
public void testRefill() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 300, 1,
new ValueQueue<String>(100, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (10) and an async refill (91)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num);
// Trigger refill
vq.getNext("k1");
Assert.assertEquals(1, filler.getTop().num);
Assert.assertEquals(10, filler.getTop().num);

// Wait for the async task to finish
waitForRefill(vq, "k1", 100);
// Refill task should add 91 values to get to a full queue (10 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(91, filler.getTop().num);
vq.shutdown();
}

Expand All @@ -125,10 +140,27 @@ public void testRefill() throws Exception {
public void testNoRefill() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.5f, 300, 1,
new ValueQueue<String>(10, 0.5f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (5) and an async refill (6)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(5, filler.getTop().num);

// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 6 values to get to a full queue (5 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(6, filler.getTop().num);

// Take another value, queue is still above the watermark
Assert.assertEquals("test", vq.getNext("k1"));

// Wait a while to make sure that no async refills are triggered
try {
waitForRefill(vq, "k1", 10);
} catch (TimeoutException ignored) {
// This is the correct outcome - no refill is expected
}
Assert.assertEquals(null, filler.getTop());
vq.shutdown();
}
Expand All @@ -140,11 +172,29 @@ public void testNoRefill() throws Exception {
public void testgetAtMostPolicyALL() throws Exception {
MockFiller filler = new MockFiller();
final ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 300, 1,
new ValueQueue<String>(10, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (1) and an async refill (10)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num);

// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue (1 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(10, filler.getTop().num);

// Drain completely, no further refills triggered
vq.drain("k1");

// Wait a while to make sure that no async refills are triggered
try {
waitForRefill(vq, "k1", 10);
} catch (TimeoutException ignored) {
// This is the correct outcome - no refill is expected
}
Assert.assertNull(filler.getTop());

// Synchronous call:
// 1. Synchronously fill returned list
// 2. Start another async task to fill the queue in the cache
Expand All @@ -154,23 +204,16 @@ public void testgetAtMostPolicyALL() throws Exception {
filler.getTop().num);

// Wait for the async task to finish
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int size = vq.getSize("k1");
if (size != 10) {
LOG.info("Current ValueQueue size is " + size);
return false;
}
return true;
}
}, 100, 3000);
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);

// Drain completely after filled by the async thread
Assert.assertEquals("Failed to drain completely after async.", 10,
vq.getAtMost("k1", 10).size());
// Synchronous call (No Async call since num > lowWatermark)
vq.drain("k1");
Assert.assertEquals("Failed to drain completely after async.", 0,
vq.getSize("k1"));

// Synchronous call
Assert.assertEquals("Failed to get all 19.", 19,
vq.getAtMost("k1", 19).size());
Assert.assertEquals("Failed in sync call.", 19, filler.getTop().num);
Expand All @@ -184,14 +227,29 @@ public Boolean get() {
public void testgetAtMostPolicyATLEAST_ONE() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.3f, 300, 1,
new ValueQueue<String>(10, 0.3f, 30000, 1,
SyncGenerationPolicy.ATLEAST_ONE, filler);
// Trigger a prefill (3) and an async refill (8)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(3, filler.getTop().num);
// Drain completely
Assert.assertEquals(2, vq.getAtMost("k1", 10).size());
// Asynch Refill call
Assert.assertEquals(10, filler.getTop().num);

// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 8 values to get to a full queue (3 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals("Failed in async call.", 8, filler.getTop().num);

// Drain completely, no further refills triggered
vq.drain("k1");

// Queue is empty, sync will return a single value and trigger a refill
Assert.assertEquals(1, vq.getAtMost("k1", 10).size());
Assert.assertEquals(1, filler.getTop().num);

// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
vq.shutdown();
}

Expand All @@ -202,28 +260,57 @@ public void testgetAtMostPolicyATLEAST_ONE() throws Exception {
public void testgetAtMostPolicyLOW_WATERMARK() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.3f, 300, 1,
new ValueQueue<String>(10, 0.3f, 30000, 1,
SyncGenerationPolicy.LOW_WATERMARK, filler);
// Trigger a prefill (3) and an async refill (8)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(3, filler.getTop().num);
// Drain completely

// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 8 values to get to a full queue (3 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals("Failed in async call.", 8, filler.getTop().num);

// Drain completely, no further refills triggered
vq.drain("k1");

// Queue is empty, sync will return 3 values and trigger a refill
Assert.assertEquals(3, vq.getAtMost("k1", 10).size());
// Synchronous call
Assert.assertEquals(1, filler.getTop().num);
// Asynch Refill call
Assert.assertEquals(10, filler.getTop().num);
Assert.assertEquals(3, filler.getTop().num);

// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue
Assert.assertEquals("Failed in async call.", 10, filler.getTop().num);
vq.shutdown();
}

@Test(timeout=30000)
public void testDrain() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<String>(10, 0.1f, 300, 1,
new ValueQueue<String>(10, 0.1f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
// Trigger a prefill (1) and an async refill (10)
Assert.assertEquals("test", vq.getNext("k1"));
Assert.assertEquals(1, filler.getTop().num);

// Wait for the async task to finish
waitForRefill(vq, "k1", 10);
// Refill task should add 10 values to get to a full queue (1 produced by
// the prefill to the low watermark, 1 consumed by getNext())
Assert.assertEquals(10, filler.getTop().num);

// Drain completely, no further refills triggered
vq.drain("k1");

// Wait a while to make sure that no async refills are triggered
try {
waitForRefill(vq, "k1", 10);
} catch (TimeoutException ignored) {
// This is the correct outcome - no refill is expected
}
Assert.assertNull(filler.getTop());
vq.shutdown();
}
Expand Down