Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2319,6 +2319,12 @@
<description>The AbstractFileSystem for gs: uris.</description>
</property>

<property>
<name>fs.azure.enable.readahead</name>
<value>true</value>
<description>Enabled readahead/prefetching in AbfsInputStream.</description>
</property>

<property>
<name>io.seqfile.compress.blocksize</name>
<value>1000000</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
private boolean trackLatency;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ENABLE_READAHEAD,
DefaultValue = DEFAULT_ENABLE_READAHEAD)
private boolean enabledReadAhead;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
MinValue = 0,
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
Expand Down Expand Up @@ -906,6 +911,15 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio
}
}

public boolean isReadAheadEnabled() {
return this.enabledReadAhead;
}

@VisibleForTesting
void setReadAheadEnabled(final boolean enabledReadAhead) {
this.enabledReadAhead = enabledReadAhead;
}

public int getReadAheadRange() {
return this.readAheadRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -226,6 +227,7 @@ public String toString() {
sb.append("uri=").append(uri);
sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
sb.append('}');
return sb.toString();
}
Expand Down Expand Up @@ -1533,6 +1535,11 @@ public boolean hasPathCapability(final Path path, final String capability)
new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat,
listener));

// probe for presence of the HADOOP-18546 readahead fix.
case CAPABILITY_SAFE_READAHEAD:
return true;

default:
return super.hasPathCapability(p, capability);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";

/**
* Enable or disable readahead buffer in AbfsInputStream.
* Value: {@value}.
*/
public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";

/** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** Server side encryption key */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;

public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.constants;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Constants which are used internally and which don't fit into the other
* classes.
* For use within the {@code hadoop-azure} module only.
*/
@InterfaceAudience.Private
public final class InternalConstants {

private InternalConstants() {
}

/**
* Does this version of the store have safe readahead?
* Possible combinations of this and the probe
* {@code "fs.capability.etags.available"}.
* <ol>
* <li>{@value}: store is safe</li>
* <li>!etags: store is safe</li>
* <li>etags && !{@value}: store is <i>UNSAFE</i></li>
* </ol>
*/
public static final String CAPABILITY_SAFE_READAHEAD =
"fs.azure.capability.readahead.safe";
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.util.StringUtils.toLowerCase;

/**
Expand Down Expand Up @@ -137,7 +138,7 @@ public AbfsInputStream(
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
this.readAheadEnabled = true;
this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
this.alwaysReadBufferSize
= abfsInputStreamContext.shouldReadBufferSizeAlways();
this.bufferedPreadDisabled = abfsInputStreamContext
Expand Down Expand Up @@ -745,6 +746,11 @@ byte[] getBuffer() {
return buffer;
}

@VisibleForTesting
public boolean isReadAheadEnabled() {
return readAheadEnabled;
}

@VisibleForTesting
public int getReadAheadRange() {
return readAheadRange;
Expand Down Expand Up @@ -823,11 +829,12 @@ public IOStatistics getIOStatistics() {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]");
if (streamStatistics != null) {
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append(streamStatistics.toString());
sb.append("}");
sb.append(", ").append(streamStatistics);
}
sb.append("}");
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean tolerateOobAppends;

private boolean isReadAheadEnabled = true;

private boolean alwaysReadBufferSize;

private int readAheadBlockSize;
Expand Down Expand Up @@ -72,6 +74,12 @@ public AbfsInputStreamContext withTolerateOobAppends(
return this;
}

public AbfsInputStreamContext isReadAheadEnabled(
final boolean isReadAheadEnabled) {
this.isReadAheadEnabled = isReadAheadEnabled;
return this;
}

public AbfsInputStreamContext withReadAheadRange(
final int readAheadRange) {
this.readAheadRange = readAheadRange;
Expand Down Expand Up @@ -141,6 +149,10 @@ public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}

public boolean isReadAheadEnabled() {
return isReadAheadEnabled;
}

public int getReadAheadRange() {
return readAheadRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private void init() {

// hide instance constructor
private ReadBufferManager() {
LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
}


Expand Down Expand Up @@ -544,7 +545,6 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
purgeList(stream, completedReadList);
purgeList(stream, inProgressList);
}

/**
Expand Down Expand Up @@ -642,4 +642,9 @@ void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
freeList.clear();
completedReadList.add(buf);
}

@VisibleForTesting
int getNumBuffers() {
return NUM_BUFFERS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;

/**
* Test read, write and seek.
Expand All @@ -45,20 +48,29 @@
*/
@RunWith(Parameterized.class)
public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
private static final Path TEST_PATH = new Path("/testfile");

@Parameterized.Parameters(name = "Size={0}")
private static final String TEST_PATH = "/testfile";

/**
* Parameterize on read buffer size and readahead.
* For test performance, a full x*y test matrix is not used.
* @return the test parameters
*/
@Parameterized.Parameters(name = "Size={0}-readahead={1}")
public static Iterable<Object[]> sizes() {
return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE},
{DEFAULT_READ_BUFFER_SIZE},
{APPENDBLOB_MAX_WRITE_BUFFER_SIZE},
{MAX_BUFFER_SIZE}});
return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE, true},
{DEFAULT_READ_BUFFER_SIZE, false},
{DEFAULT_READ_BUFFER_SIZE, true},
{APPENDBLOB_MAX_WRITE_BUFFER_SIZE, false},
{MAX_BUFFER_SIZE, true}});
}

private final int size;
private final boolean readaheadEnabled;

public ITestAbfsReadWriteAndSeek(final int size) throws Exception {
public ITestAbfsReadWriteAndSeek(final int size,
final boolean readaheadEnabled) throws Exception {
this.size = size;
this.readaheadEnabled = readaheadEnabled;
}

@Test
Expand All @@ -71,17 +83,25 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
abfsConfiguration.setWriteBufferSize(bufferSize);
abfsConfiguration.setReadBufferSize(bufferSize);
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);

final byte[] b = new byte[2 * bufferSize];
new Random().nextBytes(b);

try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
Path testPath = path(TEST_PATH);
FSDataOutputStream stream = fs.create(testPath);
try {
stream.write(b);
} finally{
stream.close();
}
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);

final byte[] readBuffer = new byte[2 * bufferSize];
int result;
try (FSDataInputStream inputStream = fs.open(TEST_PATH)) {
IOStatisticsSource statisticsSource = null;
try (FSDataInputStream inputStream = fs.open(testPath)) {
statisticsSource = inputStream;
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.READ, true, 0,
Expand All @@ -99,6 +119,8 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
inputStream.seek(0);
result = inputStream.read(readBuffer, 0, bufferSize);
}
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);

assertNotEquals("data read in final read()", -1, result);
assertArrayEquals(readBuffer, b);
}
Expand All @@ -109,30 +131,35 @@ public void testReadAheadRequestID() throws java.io.IOException {
final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
int bufferSize = MIN_BUFFER_SIZE;
abfsConfiguration.setReadBufferSize(bufferSize);
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);

final byte[] b = new byte[bufferSize * 10];
new Random().nextBytes(b);
try (FSDataOutputStream stream = fs.create(TEST_PATH)) {
Path testPath = path(TEST_PATH);
try (FSDataOutputStream stream = fs.create(testPath)) {
((AbfsOutputStream) stream.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.WRITE, false, 0,
((AbfsOutputStream) stream.getWrappedStream())
.getStreamID()));
stream.write(b);
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
}


final byte[] readBuffer = new byte[4 * bufferSize];
int result;
fs.registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.OPEN, false, 0));
try (FSDataInputStream inputStream = fs.open(TEST_PATH)) {
try (FSDataInputStream inputStream = fs.open(testPath)) {
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.READ, false, 0,
((AbfsInputStream) inputStream.getWrappedStream())
.getStreamID()));
result = inputStream.read(readBuffer, 0, bufferSize*4);
logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, inputStream);
}
fs.registerListener(null);
}
Expand Down
Loading