diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index f94fdebd03e19..861a05251b9f3 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2319,6 +2319,12 @@ The AbstractFileSystem for gs: uris. + + fs.azure.enable.readahead + true + Enabled readahead/prefetching in AbfsInputStream. + + io.seqfile.compress.blocksize 1000000 diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 50cc57447f92b..e20ac9ef9408d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -297,6 +297,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ABFS_LATENCY_TRACK) private boolean trackLatency; + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = FS_AZURE_ENABLE_READAHEAD, + DefaultValue = DEFAULT_ENABLE_READAHEAD) + private boolean enabledReadAhead; + @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS, MinValue = 0, DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS) @@ -906,6 +911,15 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio } } + public boolean isReadAheadEnabled() { + return this.enabledReadAhead; + } + + @VisibleForTesting + void setReadAheadEnabled(final boolean enabledReadAhead) { + this.enabledReadAhead = enabledReadAhead; + } + public int getReadAheadRange() { return this.readAheadRange; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 750306c4a983f..3272549b50e27 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -110,6 +110,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; +import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator; @@ -226,6 +227,7 @@ public String toString() { sb.append("uri=").append(uri); sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); + sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]"); sb.append('}'); return sb.toString(); } @@ -1533,6 +1535,11 @@ public boolean hasPathCapability(final Path path, final String capability) new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.HAS_PATH_CAPABILITY, tracingHeaderFormat, listener)); + + // probe for presence of the HADOOP-18546 readahead fix. + case CAPABILITY_SAFE_READAHEAD: + return true; + default: return super.hasPathCapability(p, capability); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index d86a3d9684617..85b039b46a402 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -782,6 +782,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled()) .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely()) .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead()) .withReadAheadRange(abfsConfiguration.getReadAheadRange()) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 12beb5a9bbabe..4cc4125c8569b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -186,6 +186,13 @@ public final class ConfigurationKeys { public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement"; public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider"; public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; + + /** + * Enable or disable readahead buffer in AbfsInputStream. + * Value: {@value}. + */ + public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead"; + /** Setting this true will make the driver use it's own RemoteIterator implementation */ public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator"; /** Server side encryption key */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index f58c61e8908a6..e68284da6b7d3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -106,6 +106,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false; public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; + public static final boolean DEFAULT_ENABLE_READAHEAD = true; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java new file mode 100644 index 0000000000000..12d4f14d92a00 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/InternalConstants.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.constants; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Constants which are used internally and which don't fit into the other + * classes. + * For use within the {@code hadoop-azure} module only. + */ +@InterfaceAudience.Private +public final class InternalConstants { + + private InternalConstants() { + } + + /** + * Does this version of the store have safe readahead? + * Possible combinations of this and the probe + * {@code "fs.capability.etags.available"}. + *
    + *
  1. {@value}: store is safe
  2. + *
  3. !etags: store is safe
  4. + *
  5. etags && !{@value}: store is UNSAFE
  6. + *
+ */ + public static final String CAPABILITY_SAFE_READAHEAD = + "fs.azure.capability.readahead.safe"; +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 7033ae9a4a039..14188535b8418 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -50,6 +50,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; +import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; import static org.apache.hadoop.util.StringUtils.toLowerCase; /** @@ -137,7 +138,7 @@ public AbfsInputStream( this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; this.readAheadRange = abfsInputStreamContext.getReadAheadRange(); - this.readAheadEnabled = true; + this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled(); this.alwaysReadBufferSize = abfsInputStreamContext.shouldReadBufferSizeAlways(); this.bufferedPreadDisabled = abfsInputStreamContext @@ -745,6 +746,11 @@ byte[] getBuffer() { return buffer; } + @VisibleForTesting + public boolean isReadAheadEnabled() { + return readAheadEnabled; + } + @VisibleForTesting public int getReadAheadRange() { return readAheadRange; @@ -823,11 +829,12 @@ public IOStatistics getIOStatistics() { @Override public String toString() { final StringBuilder sb = new StringBuilder(super.toString()); + sb.append("AbfsInputStream@(").append(this.hashCode()).append("){"); + sb.append("[" + CAPABILITY_SAFE_READAHEAD + "]"); if (streamStatistics != null) { - sb.append("AbfsInputStream@(").append(this.hashCode()).append("){"); - sb.append(streamStatistics.toString()); - sb.append("}"); + sb.append(", ").append(streamStatistics); } + sb.append("}"); return sb.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index 55f01bf15bcf7..05afc7b9858da 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -35,6 +35,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean tolerateOobAppends; + private boolean isReadAheadEnabled = true; + private boolean alwaysReadBufferSize; private int readAheadBlockSize; @@ -72,6 +74,12 @@ public AbfsInputStreamContext withTolerateOobAppends( return this; } + public AbfsInputStreamContext isReadAheadEnabled( + final boolean isReadAheadEnabled) { + this.isReadAheadEnabled = isReadAheadEnabled; + return this; + } + public AbfsInputStreamContext withReadAheadRange( final int readAheadRange) { this.readAheadRange = readAheadRange; @@ -141,6 +149,10 @@ public boolean isTolerateOobAppends() { return tolerateOobAppends; } + public boolean isReadAheadEnabled() { + return isReadAheadEnabled; + } + public int getReadAheadRange() { return readAheadRange; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 4eefb9fdf2c7e..0f91afe0982db 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -101,6 +101,7 @@ private void init() { // hide instance constructor private ReadBufferManager() { + LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); } @@ -544,7 +545,6 @@ public synchronized void purgeBuffersForStream(AbfsInputStream stream) { LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); purgeList(stream, completedReadList); - purgeList(stream, inProgressList); } /** @@ -642,4 +642,9 @@ void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) { freeList.clear(); completedReadList.add(buf); } + + @VisibleForTesting + int getNumBuffers() { + return NUM_BUFFERS; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java index f4f0f231037e1..beada775ae87b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -32,11 +32,14 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; /** * Test read, write and seek. @@ -45,20 +48,29 @@ */ @RunWith(Parameterized.class) public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest { - private static final Path TEST_PATH = new Path("/testfile"); - - @Parameterized.Parameters(name = "Size={0}") + private static final String TEST_PATH = "/testfile"; + + /** + * Parameterize on read buffer size and readahead. + * For test performance, a full x*y test matrix is not used. + * @return the test parameters + */ + @Parameterized.Parameters(name = "Size={0}-readahead={1}") public static Iterable sizes() { - return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE}, - {DEFAULT_READ_BUFFER_SIZE}, - {APPENDBLOB_MAX_WRITE_BUFFER_SIZE}, - {MAX_BUFFER_SIZE}}); + return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE, true}, + {DEFAULT_READ_BUFFER_SIZE, false}, + {DEFAULT_READ_BUFFER_SIZE, true}, + {APPENDBLOB_MAX_WRITE_BUFFER_SIZE, false}, + {MAX_BUFFER_SIZE, true}}); } private final int size; + private final boolean readaheadEnabled; - public ITestAbfsReadWriteAndSeek(final int size) throws Exception { + public ITestAbfsReadWriteAndSeek(final int size, + final boolean readaheadEnabled) throws Exception { this.size = size; + this.readaheadEnabled = readaheadEnabled; } @Test @@ -71,17 +83,25 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); abfsConfiguration.setWriteBufferSize(bufferSize); abfsConfiguration.setReadBufferSize(bufferSize); + abfsConfiguration.setReadAheadEnabled(readaheadEnabled); final byte[] b = new byte[2 * bufferSize]; new Random().nextBytes(b); - try (FSDataOutputStream stream = fs.create(TEST_PATH)) { + Path testPath = path(TEST_PATH); + FSDataOutputStream stream = fs.create(testPath); + try { stream.write(b); + } finally{ + stream.close(); } + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); final byte[] readBuffer = new byte[2 * bufferSize]; int result; - try (FSDataInputStream inputStream = fs.open(TEST_PATH)) { + IOStatisticsSource statisticsSource = null; + try (FSDataInputStream inputStream = fs.open(testPath)) { + statisticsSource = inputStream; ((AbfsInputStream) inputStream.getWrappedStream()).registerListener( new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.READ, true, 0, @@ -99,6 +119,8 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception { inputStream.seek(0); result = inputStream.read(readBuffer, 0, bufferSize); } + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); + assertNotEquals("data read in final read()", -1, result); assertArrayEquals(readBuffer, b); } @@ -109,30 +131,35 @@ public void testReadAheadRequestID() throws java.io.IOException { final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); int bufferSize = MIN_BUFFER_SIZE; abfsConfiguration.setReadBufferSize(bufferSize); + abfsConfiguration.setReadAheadEnabled(readaheadEnabled); final byte[] b = new byte[bufferSize * 10]; new Random().nextBytes(b); - try (FSDataOutputStream stream = fs.create(TEST_PATH)) { + Path testPath = path(TEST_PATH); + try (FSDataOutputStream stream = fs.create(testPath)) { ((AbfsOutputStream) stream.getWrappedStream()).registerListener( new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.WRITE, false, 0, ((AbfsOutputStream) stream.getWrappedStream()) .getStreamID())); stream.write(b); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); } + final byte[] readBuffer = new byte[4 * bufferSize]; int result; fs.registerListener( new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.OPEN, false, 0)); - try (FSDataInputStream inputStream = fs.open(TEST_PATH)) { + try (FSDataInputStream inputStream = fs.open(testPath)) { ((AbfsInputStream) inputStream.getWrappedStream()).registerListener( new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), fs.getFileSystemId(), FSOperationType.READ, false, 0, ((AbfsInputStream) inputStream.getWrappedStream()) .getStreamID())); result = inputStream.read(readBuffer, 0, bufferSize*4); + logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, inputStream); } fs.registerListener(null); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java index 8b60dd801cb30..f7d4a5b7a83e7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java @@ -20,14 +20,22 @@ import java.net.URI; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.services.AuthType; +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE; +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME; +import static org.apache.hadoop.fs.CommonPathCapabilities.FS_ACLS; +import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; +import static org.junit.Assume.assumeTrue; + /** * Test AzureBlobFileSystem initialization. */ @@ -74,4 +82,28 @@ public void ensureSecureAzureBlobFileSystemIsInitialized() throws Exception { assertNotNull("working directory", fs.getWorkingDirectory()); } } + + @Test + public void testFileSystemCapabilities() throws Throwable { + final AzureBlobFileSystem fs = getFileSystem(); + + final Path p = new Path("}"); + // etags always present + Assertions.assertThat(fs.hasPathCapability(p, ETAGS_AVAILABLE)) + .describedAs("path capability %s in %s", ETAGS_AVAILABLE, fs) + .isTrue(); + // readahead always correct + Assertions.assertThat(fs.hasPathCapability(p, CAPABILITY_SAFE_READAHEAD)) + .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs) + .isTrue(); + + // etags-over-rename and ACLs are either both true or both false. + final boolean etagsAcrossRename = fs.hasPathCapability(p, ETAGS_PRESERVED_IN_RENAME); + final boolean acls = fs.hasPathCapability(p, FS_ACLS); + Assertions.assertThat(etagsAcrossRename) + .describedAs("capabilities %s=%s and %s=%s in %s", + ETAGS_PRESERVED_IN_RENAME, etagsAcrossRename, + FS_ACLS, acls, fs) + .isEqualTo(acls); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java index 006004850d0df..0e7c70e91a9f8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java @@ -130,10 +130,10 @@ public void runCorrelationTestForAllMethods() throws Exception { testClasses.put(new ITestAzureBlobFileSystemListStatus(), //liststatus ITestAzureBlobFileSystemListStatus.class.getMethod("testListPath")); - testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //open, + testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, true), //open, // read, write ITestAbfsReadWriteAndSeek.class.getMethod("testReadAheadRequestID")); - testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //read (bypassreadahead) + testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, false), //read (bypassreadahead) ITestAbfsReadWriteAndSeek.class .getMethod("testReadAndWriteWithDifferentBufferSizesAndSeek")); testClasses.put(new ITestAzureBlobFileSystemAppend(), //append diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index 705cc2530d335..a57430fa808cc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -43,9 +44,24 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; +import static org.apache.hadoop.test.LambdaTestUtils.eventually; public class ITestReadBufferManager extends AbstractAbfsIntegrationTest { + /** + * Time before the JUnit test times out for eventually() clauses + * to fail. This copes with slow network connections and debugging + * sessions, yet still allows for tests to fail with meaningful + * messages. + */ + public static final int TIMEOUT_OFFSET = 5 * 60_000; + + /** + * Interval between eventually preobes. + */ + public static final int PROBE_INTERVAL_MILLIS = 1_000; + public ITestReadBufferManager() throws Exception { } @@ -60,6 +76,11 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { } ExecutorService executorService = Executors.newFixedThreadPool(4); AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + // verify that the fs has the capability to validate the fix + Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD)) + .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs) + .isTrue(); + try { for (int i = 0; i < 4; i++) { final String fileName = methodName.getMethodName() + i; @@ -74,17 +95,16 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { } } finally { executorService.shutdown(); + // wait for all tasks to finish + executorService.awaitTermination(1, TimeUnit.MINUTES); } ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); - assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); + // readahead queue is empty assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); - Assertions.assertThat(bufferManager.getFreeListCopy()) - .describedAs("After closing all streams free list contents should match with " + freeList) - .hasSize(numBuffers) - .containsExactlyInAnyOrderElementsOf(freeList); - + // verify the in progress list eventually empties out. + eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> + assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList())); } private void assertListEmpty(String listName, List list) { @@ -116,22 +136,18 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { try { iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); iStream2.read(); - // After closing stream1, none of the buffers associated with stream1 should be present. - assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1); - assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1); + // After closing stream1, no queued buffers of stream1 should be present + // assertions can't be made about the state of the other lists as it is + // too prone to race conditions. assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); } finally { // closing the stream later. IOUtils.closeStream(iStream2); } - // After closing stream2, none of the buffers associated with stream2 should be present. - assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2); - assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2); + // After closing stream2, no queued buffers of stream2 should be present. assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); - // After closing both the streams, all lists should be empty. - assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy()); - assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList()); + // After closing both the streams, read queue should be empty. assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 5e73d8424ba8b..7d2efa8931f47 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.mockito.Mockito; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -73,6 +74,12 @@ public class TestAbfsInputStream extends REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; + @Override + public void teardown() throws Exception { + super.teardown(); + ReadBufferManager.getBufferManager().testResetReadBufferManager(); + } + private AbfsRestOperation getMockRestOp() { AbfsRestOperation op = mock(AbfsRestOperation.class); AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); @@ -384,6 +391,69 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + final Long serverCommunicationMockLatency = 3_000L; + final Long readBufferTransferToInProgressProbableTime = 1_000L; + final Integer readBufferQueuedCount = 3; + + Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client to backend. + Thread.sleep(serverCommunicationMockLatency); + return successOp; + }) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + final ReadBufferManager readBufferManager + = ReadBufferManager.getBufferManager(); + + final int readBufferTotal = readBufferManager.getNumBuffers(); + final int expectedFreeListBufferCount = readBufferTotal + - readBufferQueuedCount; + + try (AbfsInputStream inputStream = getAbfsInputStream(client, + "testSuccessfulReadAhead.txt")) { + // As this is try-with-resources block, the close() method of the created + // abfsInputStream object shall be called on the end of the block. + queueReadAheads(inputStream); + + //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. + Thread.sleep(readBufferTransferToInProgressProbableTime); + + Assertions.assertThat(readBufferManager.getInProgressCopiedList()) + .describedAs(String.format("InProgressList should have %d elements", + readBufferQueuedCount)) + .hasSize(readBufferQueuedCount); + Assertions.assertThat(readBufferManager.getFreeListCopy()) + .describedAs(String.format("FreeList should have %d elements", + expectedFreeListBufferCount)) + .hasSize(expectedFreeListBufferCount); + Assertions.assertThat(readBufferManager.getCompletedReadListCopy()) + .describedAs("CompletedList should have 0 elements") + .hasSize(0); + } + + Assertions.assertThat(readBufferManager.getInProgressCopiedList()) + .describedAs(String.format("InProgressList should have %d elements", + readBufferQueuedCount)) + .hasSize(readBufferQueuedCount); + Assertions.assertThat(readBufferManager.getFreeListCopy()) + .describedAs(String.format("FreeList should have %d elements", + expectedFreeListBufferCount)) + .hasSize(expectedFreeListBufferCount); + Assertions.assertThat(readBufferManager.getCompletedReadListCopy()) + .describedAs("CompletedList should have 0 elements") + .hasSize(0); + } + /** * This test expects ReadAheadManager to throw exception if the read ahead * thread had failed within the last thresholdAgeMilliseconds.