Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2168,9 +2168,8 @@ The switch to turn S3A auditing on or off.

<property>
<name>fs.azure.enable.readahead</name>
<value>false</value>
<description>Disable readahead/prefetching in AbfsInputStream.
See HADOOP-18521</description>
<value>true</value>
<description>Enabled readahead/prefetching in AbfsInputStream.</description>
</property>

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;

public static final boolean DEFAULT_ENABLE_READAHEAD = false;
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean tolerateOobAppends;

private boolean isReadAheadEnabled = false;
private boolean isReadAheadEnabled = true;

private boolean alwaysReadBufferSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
purgeList(stream, completedReadList);
purgeList(stream, inProgressList);
}

/**
Expand Down Expand Up @@ -642,4 +641,9 @@ void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
freeList.clear();
completedReadList.add(buf);
}

@VisibleForTesting
int getNumBuffers() {
return NUM_BUFFERS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_AHEAD_RANGE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
Expand Down Expand Up @@ -69,7 +68,6 @@ protected Configuration createConfiguration() {
protected AbstractFSContract createContract(final Configuration conf) {
conf.setInt(AZURE_READ_AHEAD_RANGE, MIN_BUFFER_SIZE);
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD, true);
return new AbfsFileSystemContract(conf, isSecure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public class TestAbfsInputStream extends
REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB;

@Override
public void teardown() throws Exception {
super.teardown();
ReadBufferManager.getBufferManager().testResetReadBufferManager();
}

private AbfsRestOperation getMockRestOp() {
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
Expand All @@ -106,7 +112,6 @@ private AbfsClient getMockAbfsClient() {
private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
String fileName) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
mockAbfsClient,
Expand All @@ -132,7 +137,6 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
boolean alwaysReadBufferSize,
int readAheadBlockSize) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
inputStreamContext.isReadAheadEnabled(true);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
abfsClient,
Expand Down Expand Up @@ -495,6 +499,63 @@ public void testSuccessfulReadAhead() throws Exception {
checkEvictedStatus(inputStream, 0, true);
}

/**
* This test expects InProgressList is not purged by the inputStream close.
*/
@Test
public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
AbfsClient client = getMockAbfsClient();
AbfsRestOperation successOp = getMockRestOp();
final Long serverCommunicationMockLatency = 3_000L;
final Long readBufferTransferToInProgressProbableTime = 1_000L;
final Integer readBufferQueuedCount = 3;

Mockito.doAnswer(invocationOnMock -> {
//sleeping thread to mock the network latency from client to backend.
Thread.sleep(serverCommunicationMockLatency);
return successOp;
})
.when(client)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is trying to unit test a bigger scope of existing inprogress buffer moving to completed list. Will be nice to scope the test to inProgressList and freelist counts, before and after close.

At this client.read() mock, I would suggest mocks that will invoke a large sleep for each read. That way after queueReadAheads call and a 1 sec sleep, 3 buffers will be stuck inProgessList and the freeeList should show 13 free. The asserts should continue to hold to same numbers post close as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have taken the change of sleep and assertion on freeList also included in the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very brittle being timing based. normally I'd say "no" here, but I know I have a forthcoming pr which uses object.wait/notify to synchronize
https://github.com/apache/hadoop/pull/5117/files#diff-e829dbaa29faf05ae0b331439e9aec3cd02248464a097c86a0227783337b9b76R370

if this test causes problems it should do the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Steve, the sleep time on these mock threads are meant to hold the thread blocked while the test goes ahead with asserts after queuing reads and asserts after close. The sleep of 1 second (which will block the main test thread) after queueing reads has been consistent with the timing expectations with pre-existing tests in this class doing the same, however I agree that this test has lot more going beyond the close which needs time synchronization, which can make the test brittle.

Hi Pranav, The test asserts post line 566 starting from 3 sec sleep are validations for correct movement of inprogress buffers to completed list and their evictions, which is a functionality that this PR change does not interfere. I would suggest that we take them out and evaluate if pre-existing test coverage doesnt handle it already. If there are gaps, lets take it in separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I have removed the assertion on inProgress to completedList and the eviction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be slow, but at least there's no assertion that something finishes before a specific timeout. those are the tests which really have problems on slow networks/overloaded systems

.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class), any(TracingContext.class));

AbfsInputStream inputStream = getAbfsInputStream(client,
"testSuccessfulReadAhead.txt");
queueReadAheads(inputStream);

final ReadBufferManager readBufferManager
= ReadBufferManager.getBufferManager();

final int readBufferTotal = readBufferManager.getNumBuffers();

//Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
Thread.sleep(readBufferTransferToInProgressProbableTime);

Assertions.assertThat(readBufferManager.getInProgressCopiedList())
.describedAs("InProgressList should have " + readBufferQueuedCount + " elements")
.hasSize(readBufferQueuedCount);
final int freeListBufferCount = readBufferTotal - readBufferQueuedCount;
Assertions.assertThat(readBufferManager.getFreeListCopy())
.describedAs("FreeList should have " + freeListBufferCount + "elements")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can actually use string.format patterns here; most relevant for on demand toString calls which are more expensive. I'm not worrying about it here though

.hasSize(freeListBufferCount);
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
.describedAs("CompletedList should have 0 elements")
.hasSize(0);

inputStream.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem with the close() here is that it will only be reached if the assertions hold. if anything goes wrong, an exception is raised and the stream kept open, with whatever resources it consumes.

it should be closed in a finally block or the stream opened in a try-with-resources clause. thanks


Assertions.assertThat(readBufferManager.getInProgressCopiedList())
.describedAs("InProgressList should have " + readBufferQueuedCount + " elements")
.hasSize(readBufferQueuedCount);
Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
.describedAs("CompletedList should have 0 elements")
.hasSize(0);
Assertions.assertThat(readBufferManager.getFreeListCopy())
.describedAs("FreeList should have " + freeListBufferCount + " elements")
.hasSize(freeListBufferCount);
}

/**
* This test expects ReadAheadManager to throw exception if the read ahead
* thread had failed within the last thresholdAgeMilliseconds.
Expand Down