Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hadoop-tools/hadoop-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@
<exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
</excludes>

</configuration>
Expand Down Expand Up @@ -595,6 +596,7 @@
<include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
</includes>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,9 +693,10 @@ public boolean seekToNewSource(long l) throws IOException {

@Override
public synchronized void close() throws IOException {
LOG.debug("Closing {}", this);
closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
LOG.debug("Closing {}", this);
ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -456,18 +458,23 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i
buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead);
}
synchronized (this) {
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setLength(bytesActuallyRead);
} else {
freeList.push(buffer.getBufferindex());
// buffer will be deleted as per the eviction policy.
// If this buffer has already been purged during
// close of InputStream then we don't update the lists.
if (inProgressList.contains(buffer)) {
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setLength(bytesActuallyRead);
} else {
freeList.push(buffer.getBufferindex());
// buffer will be deleted as per the eviction policy.
}
// completed list also contains FAILED read buffers
// for sending exception message to clients.
buffer.setStatus(result);
buffer.setTimeStamp(currentTimeMillis());
completedReadList.add(buffer);
}

buffer.setStatus(result);
buffer.setTimeStamp(currentTimeMillis());
completedReadList.add(buffer);
}

//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
Expand Down Expand Up @@ -502,11 +509,67 @@ int getCompletedReadListSize() {
return completedReadList.size();
}

@VisibleForTesting
public synchronized List<ReadBuffer> getCompletedReadListCopy() {
return new ArrayList<>(completedReadList);
}

@VisibleForTesting
public synchronized List<Integer> getFreeListCopy() {
return new ArrayList<>(freeList);
}

@VisibleForTesting
public synchronized List<ReadBuffer> getReadAheadQueueCopy() {
return new ArrayList<>(readAheadQueue);
}

@VisibleForTesting
public synchronized List<ReadBuffer> getInProgressCopiedList() {
return new ArrayList<>(inProgressList);
}

@VisibleForTesting
void callTryEvict() {
tryEvict();
}


/**
* Purging the buffers associated with an {@link AbfsInputStream}
* from {@link ReadBufferManager} when stream is closed.
* @param stream input stream.
*/
public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
purgeList(stream, completedReadList);
purgeList(stream, inProgressList);
}

/**
* Method to remove buffers associated with a {@link AbfsInputStream}
* when its close method is called.
* NOTE: This method is not threadsafe and must be called inside a
* synchronised block. See caller.
* @param stream associated input stream.
* @param list list of buffers like {@link this#completedReadList}
* or {@link this#inProgressList}.
*/
private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
ReadBuffer readBuffer = it.next();
if (readBuffer.getStream() == stream) {
it.remove();
// As failed ReadBuffers (bufferIndex = -1) are already pushed to free
// list in doneReading method, we will skip adding those here again.
if (readBuffer.getBufferindex() != -1) {
freeList.push(readBuffer.getBufferindex());
}
}
}
}

/**
* Test method that can clean up the current state of readAhead buffers and
* the lists. Will also trigger a fresh init.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.io.IOUtils;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_BLOCK_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;

public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {

public ITestReadBufferManager() throws Exception {
}

@Test
public void testPurgeBufferManagerForParallelStreams() throws Exception {
describe("Testing purging of buffers from ReadBufferManager for "
+ "parallel input streams");
final int numBuffers = 16;
final LinkedList<Integer> freeList = new LinkedList<>();
for (int i=0; i < numBuffers; i++) {
freeList.add(i);
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
try {
for (int i = 0; i < 4; i++) {
final String fileName = methodName.getMethodName() + i;
executorService.submit((Callable<Void>) () -> {
byte[] fileContent = getRandomBytesArray(ONE_MB);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
try (FSDataInputStream iStream = fs.open(testFilePath)) {
iStream.read();
}
return null;
});
}
} finally {
executorService.shutdown();
}

ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
Assertions.assertThat(bufferManager.getFreeListCopy())
.describedAs("After closing all streams free list contents should match with " + freeList)
.hasSize(numBuffers)
.containsExactlyInAnyOrderElementsOf(freeList);

}

private void assertListEmpty(String listName, List<ReadBuffer> list) {
Assertions.assertThat(list)
.describedAs("After closing all streams %s should be empty", listName)
.hasSize(0);
}

@Test
public void testPurgeBufferManagerForSequentialStream() throws Exception {
describe("Testing purging of buffers in ReadBufferManager for "
+ "sequential input streams");
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
final String fileName = methodName.getMethodName();
byte[] fileContent = getRandomBytesArray(ONE_MB);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);

AbfsInputStream iStream1 = null;
// stream1 will be closed right away.
try {
iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
// Just reading one byte will trigger all read ahead calls.
iStream1.read();
} finally {
IOUtils.closeStream(iStream1);
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
AbfsInputStream iStream2 = null;
try {
iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
iStream2.read();
// After closing stream1, none of the buffers associated with stream1 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1);
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
} finally {
// closing the stream later.
IOUtils.closeStream(iStream2);
}
// After closing stream2, none of the buffers associated with stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);

// After closing both the streams, all lists should be empty.
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());

}


private void assertListDoesnotContainBuffersForIstream(List<ReadBuffer> list,
AbfsInputStream inputStream) {
for (ReadBuffer buffer : list) {
Assertions.assertThat(buffer.getStream())
.describedAs("Buffers associated with closed input streams shouldn't be present")
.isNotEqualTo(inputStream);
}
}

private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
Configuration conf = getRawConfiguration();
conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
return (AzureBlobFileSystem) FileSystem.newInstance(conf);
}

protected byte[] getRandomBytesArray(int length) {
final byte[] b = new byte[length];
new Random().nextBytes(b);
return b;
}

protected Path createFileWithContent(FileSystem fs, String fileName,
byte[] fileContent) throws IOException {
Path testFilePath = path(fileName);
try (FSDataOutputStream oStream = fs.create(testFilePath)) {
oStream.write(fileContent);
oStream.flush();
}
return testFilePath;
}
}