Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public long capacity() {

@Override
public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) {
checkParentBreaker();
CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput);
checkParentBreaker(filePath);
return cachedIndexInput;
}

Expand All @@ -79,8 +79,8 @@ public CachedIndexInput compute(
Path key,
BiFunction<? super Path, ? super CachedIndexInput, ? extends CachedIndexInput> remappingFunction
) {
checkParentBreaker();
CachedIndexInput cachedIndexInput = theCache.compute(key, remappingFunction);
checkParentBreaker(key);
return cachedIndexInput;
}

Expand Down Expand Up @@ -201,13 +201,11 @@ public void closeIndexInputReferences() {

/**
* Ensures that the PARENT breaker is not tripped when an entry is added to the cache
* @param filePath the path key for which entry is added
*/
private void checkParentBreaker(Path filePath) {
private void checkParentBreaker() {
try {
circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry");
} catch (CircuitBreakingException ex) {
theCache.remove(filePath);
throw new CircuitBreakingException(
"Unable to create file cache entries",
ex.getBytesWanted(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ private FileCache createFileCache(long capacity) {
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
}

private FileCache createFileCache(long capacity, CircuitBreaker circuitBreaker) {
return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, circuitBreaker);
}

private FileCache createCircuitBreakingFileCache(long capacity) {
TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker();
testCircuitBreaker.startBreaking();
Expand Down Expand Up @@ -200,6 +204,20 @@ public void testComputeThrowCircuitBreakingException() {
assertNull(fileCache.get(path));
}

public void testEntryNotRemovedCircuitBreaker() {
TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
FileCache fileCache = createFileCache(MEGA_BYTES, circuitBreaker);
Path path = createPath("0");
fileCache.put(path, new StubCachedIndexInput(8 * MEGA_BYTES));
// put should succeed since circuit breaker hasn't tripped yet
assertEquals(fileCache.get(path).length(), 8 * MEGA_BYTES);
circuitBreaker.startBreaking();
// compute should throw CircuitBreakingException but shouldn't remove entry already present
assertThrows(CircuitBreakingException.class, () -> fileCache.compute(path, (p, i) -> new StubCachedIndexInput(2 * MEGA_BYTES)));
assertNotNull(fileCache.get(path));
assertEquals(fileCache.get(path).length(), 8 * MEGA_BYTES);
}

public void testRemove() {
FileCache fileCache = createFileCache(MEGA_BYTES);
for (int i = 0; i < 4; i++) {
Expand Down
Loading