diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 518026876ba05..f47001552c2a3 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -817,7 +817,9 @@ Implementations without a compliant call SHOULD throw `UnsupportedOperationExcep This is a critical precondition. Implementations of some FileSystems (e.g. Object stores) could shortcut one round trip by postponing their HTTP GET -operation until the first `read()` on the returned `FSDataInputStream`. +operation until the first `read()` on the returned `FSDataInputStream`. This +lazy open may not surface file non-existence or access permission failures +until the first `read()` of the actual data. However, much client code does depend on the existence check being performed at the time of the `open()` operation. Implementations MUST check for the presence of the file at the time of creation. This does not imply that diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 5df46eb883da1..59150832326de 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -390,6 +390,14 @@ public class AbfsConfiguration{ FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE) private boolean isPaginatedDeleteEnabled; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED, + DefaultValue = DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM) + private boolean isInputStreamLazyOptimizationEnabled; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_PREFETCH_ON_FIRST_READ_ENABLED, + DefaultValue = DEFAULT_PREFETCH_READAHEAD_ON_FIRST_READ) + private boolean isPrefetchOnFirstReadEnabled; + private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; @@ -1338,6 +1346,14 @@ public boolean getIsChecksumValidationEnabled() { return isChecksumValidationEnabled; } + public boolean isInputStreamLazyOptimizationEnabled() { + return isInputStreamLazyOptimizationEnabled; + } + + public boolean isPrefetchOnFirstReadEnabled() { + return isPrefetchOnFirstReadEnabled; + } + @VisibleForTesting public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) { this.isChecksumValidationEnabled = isChecksumValidationEnabled; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index ac564f082c9e4..e68004278e310 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -856,63 +856,51 @@ public AbfsInputStream openFileForRead(Path path, LOG.debug("openFileForRead filesystem: {} path: {}", client.getFileSystem(), path); - FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus) + final FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus) .orElse(null); String relativePath = getRelativePath(path); - String resourceType, eTag; - long contentLength; + String resourceType = null, eTag = null; + long contentLength = -1; ContextEncryptionAdapter contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance(); - /* - * GetPathStatus API has to be called in case of: - * 1. fileStatus is null or not an object of VersionedFileStatus: as eTag - * would not be there in the fileStatus object. - * 2. fileStatus is an object of VersionedFileStatus and the object doesn't - * have encryptionContext field when client's encryptionType is - * ENCRYPTION_CONTEXT. - */ - if ((fileStatus instanceof VersionedFileStatus) && ( - client.getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT - || ((VersionedFileStatus) fileStatus).getEncryptionContext() - != null)) { + String encryptionContext = null; + if (fileStatus instanceof VersionedFileStatus) { + VersionedFileStatus versionedFileStatus + = (VersionedFileStatus) fileStatus; path = path.makeQualified(this.uri, path); Preconditions.checkArgument(fileStatus.getPath().equals(path), - String.format( - "Filestatus path [%s] does not match with given path [%s]", - fileStatus.getPath(), path)); + "Filestatus path [%s] does not match with given path [%s]", + fileStatus.getPath(), path); resourceType = fileStatus.isFile() ? FILE : DIRECTORY; contentLength = fileStatus.getLen(); - eTag = ((VersionedFileStatus) fileStatus).getVersion(); - final String encryptionContext - = ((VersionedFileStatus) fileStatus).getEncryptionContext(); - if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { - contextEncryptionAdapter = new ContextProviderEncryptionAdapter( - client.getEncryptionContextProvider(), getRelativePath(path), - encryptionContext.getBytes(StandardCharsets.UTF_8)); - } - } else { - AbfsHttpOperation op = client.getPathStatus(relativePath, false, - tracingContext, null).getResult(); - resourceType = op.getResponseHeader( - HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - contentLength = Long.parseLong( - op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); - eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); - /* - * For file created with ENCRYPTION_CONTEXT, client shall receive - * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT. - */ - if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { - final String fileEncryptionContext = op.getResponseHeader( - HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT); - if (fileEncryptionContext == null) { + eTag = versionedFileStatus.getVersion(); + encryptionContext = versionedFileStatus.getEncryptionContext(); + } + + if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { + if (encryptionContext == null) { + FileStatusInternal fileStatusInternal = getFileStatusInternal(relativePath, + tracingContext); + resourceType = fileStatusInternal.getResourceType(); + contentLength = Long.parseLong(fileStatusInternal.getContentLength()); + eTag = fileStatusInternal.getETag(); + encryptionContext = fileStatusInternal.getEncryptionContext(); + + if (encryptionContext == null) { LOG.debug("EncryptionContext missing in GetPathStatus response"); throw new PathIOException(path.toString(), "EncryptionContext not present in GetPathStatus response headers"); } - contextEncryptionAdapter = new ContextProviderEncryptionAdapter( - client.getEncryptionContextProvider(), getRelativePath(path), - fileEncryptionContext.getBytes(StandardCharsets.UTF_8)); } + contextEncryptionAdapter = new ContextProviderEncryptionAdapter( + client.getEncryptionContextProvider(), getRelativePath(path), + encryptionContext.getBytes(StandardCharsets.UTF_8)); + } else if (fileStatus == null + && !abfsConfiguration.isInputStreamLazyOptimizationEnabled()) { + FileStatusInternal fileStatusInternal = getFileStatusInternal(relativePath, + tracingContext); + resourceType = fileStatusInternal.getResourceType(); + contentLength = Long.parseLong(fileStatusInternal.getContentLength()); + eTag = fileStatusInternal.getETag(); } if (parseIsDirectory(resourceType)) { @@ -934,6 +922,35 @@ contentLength, populateAbfsInputStreamContext( } } + /** + * Calls pathStatus API on the path and returns the FileStatusInternal which + * contains the ETag, ContentLength, ResourceType and EncryptionContext parsed + * from the API response. + * + * @param relativePath Path to get the status of. + * @param tracingContext TracingContext instance. + * + * @return FileStatusInternal instance containing the ETag, ContentLength, + * ResourceType and EncryptionContext of the path. + * @throws AzureBlobFileSystemException server error. + */ + private FileStatusInternal getFileStatusInternal(String relativePath, + TracingContext tracingContext) throws AzureBlobFileSystemException { + AbfsRestOperation op = client.getPathStatus(relativePath, false, + tracingContext, null); + String contentLength = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH); + String eTag = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.ETAG); + String resourceType = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + String encryptionContext = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT); + + return new FileStatusInternal(eTag, contentLength, resourceType, + encryptionContext); + } + private AbfsInputStreamContext populateAbfsInputStreamContext( Optional options, ContextEncryptionAdapter contextEncryptionAdapter) { boolean bufferedPreadDisabled = options @@ -958,6 +975,8 @@ AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()) .withBufferedPreadDisabled(bufferedPreadDisabled) .withEncryptionAdapter(contextEncryptionAdapter) .withAbfsBackRef(fsBackRef) + .withPrefetchTriggerOnFirstRead( + abfsConfiguration.isPrefetchOnFirstReadEnabled()) .build(); } @@ -2058,6 +2077,36 @@ public String toString() { } } + private static final class FileStatusInternal { + private String eTag; + private String contentLength; + private String resourceType; + private String encryptionContext; + + private FileStatusInternal(String eTag, String contentLength, String resourceType, String encryptionContext) { + this.eTag = eTag; + this.contentLength = contentLength; + this.resourceType = resourceType; + this.encryptionContext = encryptionContext; + } + + public String getETag() { + return eTag; + } + + public String getContentLength() { + return contentLength; + } + + public String getResourceType() { + return resourceType; + } + + public String getEncryptionContext() { + return encryptionContext; + } + } + /** * Permissions class contain provided permission and umask in octalNotation. * If the object is created for namespace-disabled account, the permission and diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index f16d315e4d62d..de3f6a5da3d47 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -92,6 +92,7 @@ public final class AbfsHttpConstants { public static final String HTTP_HEADER_PREFIX = "x-ms-"; public static final String HASH = "#"; public static final String TRUE = "true"; + public static final String FALSE = "false"; public static final String PLUS_ENCODE = "%20"; public static final String FORWARD_SLASH_ENCODE = "%2F"; @@ -169,6 +170,8 @@ public static ApiVersion getCurrentVersion() { */ public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100; + public static final Integer READ_PATH_REQUEST_NOT_SATISFIABLE = 416; + /** * List of configurations that are related to Customer-Provided-Keys. *
    diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 55d3f6ab4e2bc..183ff83ed89bc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -321,5 +321,21 @@ public static String accountProperty(String property, String account) { * @see FileSystem#openFile(org.apache.hadoop.fs.Path) */ public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable"; + + /** + * Enable lazy opening of an inputStream. Lazy opening would not call HEAD call + * to get file metadata before creating inputStream. ReadPath API of server + * would give the contentLength and eTag which would be used in subsequent calls + * for if-match headers. + */ + public static final String + FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED = "fs.azure.input.stream.lazy.open.optimization.enabled"; + + /** + * Enable prefetch on the first read to {@link org.apache.hadoop.fs.azurebfs.services.AbfsInputStream}. + * If disabled, first call would not trigger prefetch. Prefetch would be switched on + * after first read call. + */ + public static final String FS_AZURE_PREFETCH_ON_FIRST_READ_ENABLED = "fs.azure.prefetch.on.first.read.enabled"; private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index ade0dc39cfe18..6202687137b7c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -167,5 +167,9 @@ public final class FileSystemConfigurations { public static final int HUNDRED = 100; public static final long THOUSAND = 1000L; + public static final boolean DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM = false; + + public static final boolean DEFAULT_PREFETCH_READAHEAD_ON_FIRST_READ = true; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index b3c2b21d3c277..235f5a4e2a4a6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -34,6 +34,7 @@ public final class HttpHeaderConfigurations { public static final String IF_MATCH = "If-Match"; public static final String IF_NONE_MATCH = "If-None-Match"; public static final String CONTENT_LENGTH = "Content-Length"; + public static final String CONTENT_RANGE = "Content-Range"; public static final String CONTENT_ENCODING = "Content-Encoding"; public static final String CONTENT_LANGUAGE = "Content-Language"; public static final String CONTENT_MD5 = "Content-MD5"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsRestOperationException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsRestOperationException.java index 201b3bd2e52d7..e67d7df70cd99 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsRestOperationException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsRestOperationException.java @@ -34,6 +34,7 @@ public class AbfsRestOperationException extends AzureBlobFileSystemException { private final int statusCode; private final AzureServiceErrorCode errorCode; private final String errorMessage; + private final AbfsHttpOperation abfsHttpOperation; public AbfsRestOperationException( final int statusCode, @@ -45,6 +46,7 @@ public AbfsRestOperationException( this.statusCode = statusCode; this.errorCode = AzureServiceErrorCode.getAzureServiceCode(this.statusCode, errorCode); this.errorMessage = errorMessage; + this.abfsHttpOperation = null; } public AbfsRestOperationException( @@ -55,6 +57,7 @@ public AbfsRestOperationException( final AbfsHttpOperation abfsHttpOperation) { super(formatMessage(abfsHttpOperation), innerException); + this.abfsHttpOperation = abfsHttpOperation; this.statusCode = statusCode; this.errorCode = AzureServiceErrorCode.getAzureServiceCode(this.statusCode, errorCode); this.errorMessage = errorMessage; @@ -66,6 +69,7 @@ public AbfsRestOperationException(final HttpException innerException) { this.statusCode = innerException.getHttpErrorCode(); this.errorCode = AzureServiceErrorCode.UNKNOWN; this.errorMessage = innerException.getMessage(); + this.abfsHttpOperation = null; } public int getStatusCode() { @@ -80,6 +84,10 @@ public String getErrorMessage() { return this.errorMessage; } + public AbfsHttpOperation getAbfsHttpOperation() { + return this.abfsHttpOperation; + } + private static String formatMessage(final AbfsHttpOperation abfsHttpOperation) { // HEAD request response doesn't have StorageErrorCode, StorageErrorMessage. if (abfsHttpOperation.getMethod().equals("HEAD")) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 8ba550e06deb9..c77a833255b0a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; @@ -1148,7 +1149,9 @@ public AbfsRestOperation read(final String path, AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, String.format("bytes=%d-%d", position, position + bufferLength - 1)); requestHeaders.add(rangeHeader); - requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + if (StringUtils.isNotEmpty(eTag)) { + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + } // Add request header to fetch MD5 Hash of data returned by server. if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index cacd3b092eb3f..dc4209217ec7e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; @@ -50,6 +51,9 @@ import static java.lang.Math.max; import static java.lang.Math.min; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.READ_PATH_REQUEST_NOT_SATISFIABLE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; @@ -69,11 +73,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final AbfsClient client; private final Statistics statistics; private final String path; - private final long contentLength; + private volatile long contentLength; private final int bufferSize; // default buffer size private final int footerReadSize; // default buffer size to read when reading footer private final int readAheadQueueDepth; // initialized in constructor - private final String eTag; // eTag of the path when InputStream are created + private String eTag; // eTag of the path when InputStream are created private final boolean tolerateOobAppends; // whether tolerate Oob Appends private final boolean readAheadEnabled; // whether enable readAhead; private final String inputStreamId; @@ -128,6 +132,15 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, /** ABFS instance to be held by the input stream to avoid GC close. */ private final BackReference fsBackRef; + private volatile boolean fileStatusInformationPresent; + + /** + * Defines if the inputStream has been read sequentially. Prefetches would + * start only after the first successful sequential read. + */ + private boolean sequentialReadStarted = false; + private final boolean prefetchTriggerOnFirstRead; + public AbfsInputStream( final AbfsClient client, final Statistics statistics, @@ -145,6 +158,9 @@ public AbfsInputStream( this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; + this.fileStatusInformationPresent = StringUtils.isNotEmpty(eTag); + this.prefetchTriggerOnFirstRead = + abfsInputStreamContext.isPrefetchTriggerOnFirstRead(); this.readAheadRange = abfsInputStreamContext.getReadAheadRange(); this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled(); this.alwaysReadBufferSize @@ -199,6 +215,7 @@ public int read(long position, byte[] buffer, int offset, int length) throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } } + LOG.debug("pread requested offset = {} len = {} bufferedPreadDisabled = {}", offset, length, bufferedPreadDisabled); if (!bufferedPreadDisabled) { @@ -233,7 +250,8 @@ public int read() throws IOException { } @Override - public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + public synchronized int read(final byte[] b, final int off, final int len) + throws IOException { // check if buffer is null before logging the length if (b != null) { LOG.debug("read requested b.length = {} offset = {} len = {}", b.length, @@ -276,9 +294,9 @@ public synchronized int read(final byte[] b, final int off, final int len) throw limit = 0; bCursor = 0; } - if (shouldReadFully()) { + if (shouldReadFully(currentLen)) { lastReadBytes = readFileCompletely(b, currentOff, currentLen); - } else if (shouldReadLastBlock()) { + } else if (shouldReadLastBlock(currentLen)) { lastReadBytes = readLastBlock(b, currentOff, currentLen); } else { lastReadBytes = readOneBlock(b, currentOff, currentLen); @@ -292,16 +310,43 @@ public synchronized int read(final byte[] b, final int off, final int len) throw break; } } while (lastReadBytes > 0); - return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; + int result = totalReadBytes > 0 ? totalReadBytes : lastReadBytes; + sequentialReadStarted = true; + return result; } - private boolean shouldReadFully() { + private boolean shouldReadFully(int lengthToRead) { + if (!hasFileStatusInfo()) { + /* + * In case the fileStatus information is not available, the content length + * of the file is not known at this instant. In such cases, it would be marked + * for full read optimization if the length to read is less than the difference + * between the buffer size and the current cursor position. This implies that + * in such case a read of first block of file can be done. + * + * After the read, the contentLength would be updated and that would be used + * in future reads. + */ + return (lengthToRead + fCursor) <= this.bufferSize + && this.firstRead && this.context.readSmallFilesCompletely(); + } + return this.firstRead && this.context.readSmallFilesCompletely() - && this.contentLength <= this.bufferSize; + && getContentLength() <= this.bufferSize; } - private boolean shouldReadLastBlock() { - long footerStart = max(0, this.contentLength - FOOTER_SIZE); + private boolean shouldReadLastBlock(int lengthToRead) { + if (!hasFileStatusInfo()) { + /* + * In case the fileStatus information is not available, the content length + * of the file is not known at this instant. In such cases, it would be marked + * for footer read optimization if the length to read is less than the footer size + */ + return this.fCursor >= 0 && lengthToRead <= FOOTER_SIZE && this.firstRead + && this.context.optimizeFooterRead(); + } + + long footerStart = max(0, getContentLength() - FOOTER_SIZE); return this.firstRead && this.context.optimizeFooterRead() && this.fCursor >= footerStart; } @@ -316,7 +361,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO //If buffer is empty, then fill the buffer. if (bCursor == limit) { //If EOF, then return -1 - if (fCursor >= contentLength) { + if (hasFileStatusInfo() && fCursor >= getContentLength()) { return -1; } @@ -354,7 +399,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO fCursor += bytesRead; fCursorAfterLastRead = fCursor; } - return copyToUserBuffer(b, off, len); + return copyToUserBuffer(b, off, len, false); } private int readFileCompletely(final byte[] b, final int off, final int len) @@ -369,7 +414,10 @@ private int readFileCompletely(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, bCursor has // to be the current fCusor bCursor = (int) fCursor; - return optimisedRead(b, off, len, 0, contentLength); + if (!hasFileStatusInfo()) { + return optimisedRead(b, off, len, 0, bufferSize, true); + } + return optimisedRead(b, off, len, 0, getContentLength(), false); } // To do footer read of files when enabled. @@ -385,22 +433,36 @@ private int readLastBlock(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, // AbfsInutStream buffer is going to contain data from last block start. In // that case bCursor will be set to fCursor - lastBlockStart - long lastBlockStart = max(0, contentLength - footerReadSize); + if (!hasFileStatusInfo()) { + long lastBlockStart = max(0, (fCursor + len) - footerReadSize); + bCursor = (int) (fCursor - lastBlockStart); + return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, footerReadSize), true); + } + long lastBlockStart = max(0, getContentLength() - footerReadSize); bCursor = (int) (fCursor - lastBlockStart); // 0 if contentlength is < buffersize - long actualLenToRead = min(footerReadSize, contentLength); - return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); + long actualLenToRead = min(footerReadSize, getContentLength()); + return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false); } private int optimisedRead(final byte[] b, final int off, final int len, - final long readFrom, final long actualLen) throws IOException { + final long readFrom, final long actualLen, + final boolean isOptimizedReadWithoutContentLengthInformation) throws IOException { fCursor = readFrom; int totalBytesRead = 0; int lastBytesRead = 0; try { buffer = new byte[bufferSize]; + boolean fileStatusInformationPresentBeforeRead = hasFileStatusInfo(); + /* + * Content length would not be available for the first optimized read in case + * of lazy head optimization in inputStream. In such case, read of the first optimized read + * would be done without the contentLength constraint. Post first call, the contentLength + * would be present and should be used for further reads. + */ for (int i = 0; - i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) { + i < MAX_OPTIMIZED_READ_ATTEMPTS && (!hasFileStatusInfo() + || fCursor < getContentLength()); i++) { lastBytesRead = readInternal(fCursor, buffer, limit, (int) actualLen - limit, true); if (lastBytesRead > 0) { @@ -408,8 +470,22 @@ private int optimisedRead(final byte[] b, final int off, final int len, limit += lastBytesRead; fCursor += lastBytesRead; fCursorAfterLastRead = fCursor; + + if (shouldBreakLazyOptimizedRead((int) actualLen, totalBytesRead, + fileStatusInformationPresentBeforeRead)) { + break; + } } } + } catch (FileNotFoundException ex) { + /* + * FileNotFoundException in AbfsInputStream read can happen only in case of + * lazy optimization enabled. In such case, the contentLength is not known + * before opening the inputStream, and the first read can give a + * FileNotFoundException, and if this exception is raised, it has to be + * thrown back to the application and make a readOneBlock call. + */ + throw ex; } catch (IOException e) { LOG.debug("Optimized read failed. Defaulting to readOneBlock {}", e); restorePointerState(); @@ -424,11 +500,45 @@ private int optimisedRead(final byte[] b, final int off, final int len, // If the read was partial and the user requested part of data has // not read then fallback to readoneblock. When limit is smaller than // bCursor that means the user requested data has not been read. - if (fCursor < contentLength && bCursor > limit) { + if (fCursor < getContentLength() && bCursor > limit) { restorePointerState(); return readOneBlock(b, off, len); } - return copyToUserBuffer(b, off, len); + return copyToUserBuffer(b, off, len, isOptimizedReadWithoutContentLengthInformation); + } + + /** + * In non-lazily opened inputStream, the contentLength would be available before + * opening the inputStream. In such case, optimized read would always be done + * on the last part of the file. + * + * In lazily opened inputStream, the contentLength would not be available before + * opening the inputStream. In such case, contentLength conditioning would not be + * applied to execute optimizedRead. Hence, the optimized read may not be done on the + * last part of the file. If the optimized read is done on the non-last part of the + * file, inputStream should read only the amount of data requested by optimizedRead, + * as the buffer supplied would be only of the size of the data requested by optimizedRead. + * + * @param actualLen actual length to read. + * @param totalBytesRead total bytes read. + * @param fileStatusInformationPresentBeforeRead file status information present before read. + * + * @return true if should break lazy optimized read, false otherwise. + */ + private boolean shouldBreakLazyOptimizedRead(final int actualLen, + final int totalBytesRead, + final boolean fileStatusInformationPresentBeforeRead) { + return !fileStatusInformationPresentBeforeRead + && totalBytesRead == actualLen; + } + + @VisibleForTesting + long getContentLength() { + return contentLength; + } + + private boolean hasFileStatusInfo() { + return fileStatusInformationPresent; } private void savePointerState() { @@ -456,8 +566,7 @@ private boolean validate(final byte[] b, final int off, final int len) Preconditions.checkNotNull(b); LOG.debug("read one block requested b.length = {} off {} len {}", b.length, off, len); - - if (this.available() == 0) { + if (hasFileStatusInfo() && this.available() == 0) { return false; } @@ -467,7 +576,21 @@ private boolean validate(final byte[] b, final int off, final int len) return true; } - private int copyToUserBuffer(byte[] b, int off, int len){ + private int copyToUserBuffer(byte[] b, int off, int len, + final boolean isOptimizedReadWithoutContentLengthInfo){ + /* + * If the ABFS is running with head optimization for opening InputStream, the + * application can give invalid indexes such that the required data is out of file length, + * but there can be a part of optimized read which can be in the file, and can be + * read in the AbfsInputStream buffer. But since, the application has asked for + * invalid indexes, it will receive a -1. + */ + if (isOptimizedReadWithoutContentLengthInfo && bCursor > limit) { + bCursor = limit; + nextReadPos = getContentLength(); + return -1; + } + //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) //(bytes returned may be less than requested) int bytesRemaining = limit - bCursor; @@ -488,7 +611,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){ private int readInternal(final long position, final byte[] b, final int offset, final int length, final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { + if (readAheadEnabled && !bypassReadAhead && effectiveReadAhead()) { // try reading from read-ahead if (offset != 0) { throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); @@ -500,11 +623,11 @@ private int readInternal(final long position, final byte[] b, final int offset, long nextOffset = position; // First read to queue needs to be of readBufferSize and later // of readAhead Block size - long nextSize = min((long) bufferSize, contentLength - nextOffset); + long nextSize = min((long) bufferSize, getContentLength() - nextOffset); LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads); TracingContext readAheadTracingContext = new TracingContext(tracingContext); readAheadTracingContext.setPrimaryRequestID(); - while (numReadAheads > 0 && nextOffset < contentLength) { + while (numReadAheads > 0 && nextOffset < getContentLength()) { LOG.debug("issuing read ahead requestedOffset = {} requested size {}", nextOffset, nextSize); ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize, @@ -512,7 +635,7 @@ private int readInternal(final long position, final byte[] b, final int offset, nextOffset = nextOffset + nextSize; numReadAheads--; // From next round onwards should be of readahead block size. - nextSize = min((long) readAheadBlockSize, contentLength - nextOffset); + nextSize = min((long) readAheadBlockSize, getContentLength() - nextOffset); } // try reading from buffers first @@ -536,11 +659,26 @@ private int readInternal(final long position, final byte[] b, final int offset, } } + /** + * ReadAhead can happen only if the sequential read has started or if the + * inputStream has the HEAD information of the path and the config + * {@link org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys#FS_AZURE_PREFETCH_ON_FIRST_READ_ENABLED} + * is enabled. In case of lazy head optimization, the contentLength is not known + * before opening the inputStream. In such case, the readAhead can happen only + * after the first successful sequential read. + * + * @return true if readAhead can be triggered, false otherwise. + */ + private boolean effectiveReadAhead() { + return (prefetchTriggerOnFirstRead && hasFileStatusInfo()) + || sequentialReadStarted; + } + int readRemote(long position, byte[] b, int offset, int length, TracingContext tracingContext) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } - if (position >= contentLength) { + if (hasFileStatusInfo() && position >= getContentLength()) { return -1; // Hadoop prefers -1 to EOFException } if (b == null) { @@ -556,6 +694,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } final AbfsRestOperation op; + AbfsHttpOperation abfsHttpOperation = null; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { if (streamStatistics != null) { @@ -565,6 +704,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), contextEncryptionAdapter, tracingContext); + abfsHttpOperation = op.getResult(); cachedSasToken.update(op.getSasToken()); LOG.debug("issuing HTTP GET request params position = {} b.length = {} " + "offset = {} length = {}", position, b.length, offset, length); @@ -576,8 +716,24 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new FileNotFoundException(ere.getMessage()); } + /* + * Status 416 is sent when read range is out of contentLength range. + * This would happen only in the case if contentLength is not known before + * opening the inputStream. + */ + if (ere.getStatusCode() == READ_PATH_REQUEST_NOT_SATISFIABLE + && !hasFileStatusInfo()) { + LOG.error("Read range is out of contentLength range. " + + "This can happen only in case of lazy head optimization. " + + "Path" + path + " position " + position + " length " + length, ere); + return -1; + } } throw new IOException(ex); + } finally { + if (!hasFileStatusInfo() && abfsHttpOperation != null) { + initPropertiesFromReadResponseHeader(abfsHttpOperation); + } } long bytesRead = op.getResult().getBytesReceived(); if (streamStatistics != null) { @@ -591,6 +747,23 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t return (int) bytesRead; } + private void initPropertiesFromReadResponseHeader(final AbfsHttpOperation op) throws IOException { + validateFileResourceTypeAndParseETag(op); + contentLength = parseFromRange( + op.getResponseHeader(HttpHeaderConfigurations.CONTENT_RANGE)); + } + + private long parseFromRange(final String responseHeader) { + if (StringUtils.isEmpty(responseHeader)) { + return -1; + } + String[] parts = responseHeader.split(FORWARD_SLASH); + if (parts.length != 2) { + return -1; + } + return Long.parseLong(parts[1]); + } + /** * Increment Read Operations. */ @@ -615,7 +788,7 @@ public synchronized void seek(long n) throws IOException { if (n < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } - if (n > contentLength) { + if (hasFileStatusInfo() && n > getContentLength()) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } @@ -634,7 +807,7 @@ public synchronized long skip(long n) throws IOException { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } long currentPos = getPos(); - if (currentPos == contentLength) { + if (hasFileStatusInfo() && currentPos == getContentLength()) { if (n > 0) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } @@ -644,8 +817,8 @@ public synchronized long skip(long n) throws IOException { newPos = 0; n = newPos - currentPos; } - if (newPos > contentLength) { - newPos = contentLength; + if (hasFileStatusInfo() && newPos > getContentLength()) { + newPos = getContentLength(); n = newPos - currentPos; } seek(newPos); @@ -667,24 +840,29 @@ public synchronized int available() throws IOException { throw new IOException( FSExceptionMessages.STREAM_IS_CLOSED); } - final long remaining = this.contentLength - this.getPos(); + if (!hasFileStatusInfo()) { + AbfsRestOperation op = client.getPathStatus(path, false, tracingContext, + null); + validateFileResourceTypeAndParseETag(op.getResult()); + contentLength = Long.parseLong( + op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + } + final long remaining = getContentLength() - this.getPos(); return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; } - /** - * Returns the length of the file that this stream refers to. Note that the length returned is the length - * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, - * they wont be reflected in the returned length. - * - * @return length of the file. - * @throws IOException if the stream is closed - */ - public long length() throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + private void validateFileResourceTypeAndParseETag(final AbfsHttpOperation op) + throws FileNotFoundException { + if (DIRECTORY.equals( + op + .getResponseHeader( + HttpHeaderConfigurations.X_MS_RESOURCE_TYPE))) { + throw new FileNotFoundException( + "read must be used with files and not directories. Path: " + path); } - return contentLength; + eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); + fileStatusInformationPresent = true; } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index fdcad5ac3a0d0..621c5275b05ec 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -62,6 +62,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private ContextEncryptionAdapter contextEncryptionAdapter = null; + private boolean prefetchTriggerOnFirstRead; + public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -150,6 +152,12 @@ public AbfsInputStreamContext withEncryptionAdapter( return this; } + public AbfsInputStreamContext withPrefetchTriggerOnFirstRead( + final boolean prefetchTriggerOnFirstRead) { + this.prefetchTriggerOnFirstRead = prefetchTriggerOnFirstRead; + return this; + } + public AbfsInputStreamContext build() { if (readBufferSize > readAheadBlockSize) { LOG.debug( @@ -220,4 +228,8 @@ public BackReference getFsBackRef() { public ContextEncryptionAdapter getEncryptionAdapter() { return contextEncryptionAdapter; } + + public boolean isPrefetchTriggerOnFirstRead() { + return prefetchTriggerOnFirstRead; + } } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 37904808ec659..cbeeffb067011 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -994,6 +994,19 @@ to 100 MB). The default value will be 8388608 (8 MB). bytes. The value should be between 16384 to 104857600 both inclusive (16 KB to 100 MB). The default value will be 4194304 (4 MB). +`fs.azure.input.stream.lazy.open.optimization.enabled`: Enable lazy open of the +inputStream. This would prevent the metadata server call for opening inputStream. +The metadata call is called to get the contentLength and eTag of the inputStream +path. This information is available in the read server response. The first read +call would populate these fields in the inputStream for the subsequent read calls. +If this config is true, the first read call would not make read-ahead calls. After +the first read call, the read-ahead mechanism would get enabled. The default value +will be false. + +`fs.azure.prefetch.on.first.read.enabled`: If disabled, the first read call on the +inputStream will not make read-ahead calls. After first read call, the read-ahead +mechanism will get enabled. The default value will be true. + `fs.azure.read.alwaysReadBufferSize`: Read request size configured by `fs.azure.read.request.size` will be honoured only when the reads done are in sequential pattern. When the read pattern is detected to be random, read size diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsDurationTrackers.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsDurationTrackers.java index 0997b3dbd44d4..8a0b9911b19e9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsDurationTrackers.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsDurationTrackers.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.assertj.core.api.AbstractDoubleAssert; import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; @@ -101,11 +102,24 @@ public void testAbfsHttpCallsDurations() throws IOException { */ private void assertDurationTracker(IOStatistics ioStatistics) { for (AbfsStatistic abfsStatistic : HTTP_DURATION_TRACKER_LIST) { - Assertions.assertThat(lookupMeanStatistic(ioStatistics, + AbstractDoubleAssert doubleAssert = Assertions.assertThat(lookupMeanStatistic(ioStatistics, abfsStatistic.getStatName() + StoreStatisticNames.SUFFIX_MEAN).mean()) .describedAs("The DurationTracker Named " + abfsStatistic.getStatName() - + " Doesn't match the expected value.") - .isGreaterThan(0.0); + + " Doesn't match the expected value."); + if (abfsStatistic == HTTP_HEAD_REQUEST && getConfiguration().isInputStreamLazyOptimizationEnabled()) { + /* + * In an environment where this is the only test running, there would be no + * head call for the fileSystem as the inputStream would be lazily opened. + * + * But, in an environment where there are multiple tests running in parallel + * using same fileSystem instance, there could be few head calls from those tests. + * + * Hence, asserting for greater than or equal to 0.0. + */ + doubleAssert.isGreaterThanOrEqualTo(0.0); + } else { + doubleAssert.isGreaterThan(0.0); + } + } } - } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index afc92c111a913..29500edcfbfd9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -226,15 +226,28 @@ public void testReadStatistics() throws IOException { * readOps - Since each time read operation is performed OPERATIONS * times, total number of read operations would be equal to OPERATIONS. * - * remoteReadOps - Only a single remote read operation is done. Hence, + * remoteReadOps - + * In case of Head Optimization and footer optimization enabled, and readSmallFile + * disabled for InputStream, the first read operation would read only the asked range + * and would not be able to read the entire file as it has no information on the contentLength + * of the file. The second read would be able to read entire file (1MB) in buffer. Hence, + * total remote read ops would be 2. + * In case of no Head Optimization for InputStream, it is aware of the contentLength and + * only a single remote read operation is done. Hence, * total remote read ops is 1. - * + * In case of Head Optimization enabled and readSmallFile enabled for inputStream, + * on the first read request from the application, it will read first readBufferSize + * block of the file, which would contain the whole file. Hence, total remote read ops + * would be 1. */ assertEquals("Mismatch in bytesRead value", OPERATIONS, stats.getBytesRead()); assertEquals("Mismatch in readOps value", OPERATIONS, stats.getReadOperations()); - assertEquals("Mismatch in remoteReadOps value", 1, + assertEquals("Mismatch in remoteReadOps value", + getConfiguration().isInputStreamLazyOptimizationEnabled() + && !getConfiguration().readSmallFilesCompletely() + && getConfiguration().optimizeFooterRead() ? 2 : 1, stats.getRemoteReadOperations()); in.close(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java index 66b8da89572a1..7af8b31ac0c57 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -221,8 +221,14 @@ public void testAbfsHttpResponseStatistics() throws IOException { in = fs.open(getResponsePath); // Network stats calculation: For Creating AbfsInputStream: // 1 GetFileStatus request to fetch file size = 1 connection and 1 get response - expectedConnectionsMade++; - expectedGetResponses++; + if (!getConfiguration().isInputStreamLazyOptimizationEnabled()) { + /* + * If head optimization is enabled, getFileStatus is not called. Hence, there + * would be no connection made and get response for the operation 'open'. + */ + expectedConnectionsMade++; + expectedGetResponses++; + } // -------------------------------------------------------------------- // Operation: Read @@ -231,7 +237,27 @@ public void testAbfsHttpResponseStatistics() throws IOException { // 1 read request = 1 connection and 1 get response expectedConnectionsMade++; expectedGetResponses++; - expectedBytesReceived += bytesWrittenToFile; + if (!getConfiguration().isInputStreamLazyOptimizationEnabled() + || !getConfiguration().optimizeFooterRead() + || (getConfiguration().readSmallFilesCompletely() + && getConfiguration().getReadBufferSize() >= bytesWrittenToFile)) { + expectedBytesReceived += bytesWrittenToFile; + } else { + /* + * With head optimization enabled and footer optimization enabled + * and read full optimization disabled, the abfsInputStream is not aware + * of the contentLength and hence, it would only read data for which the + * range is provided. With the first remote call done, the inputStream will + * get aware of the contentLength and would be able to use it for further reads. + * + * At this point, the inputStream is at position 0 and the read request from + * application is 1 Byte. If the read full-file optimization is enabled, + * the inputStream would attempt to read the first readBuffer block + * from the file, which would read the whole file as the fileContentLength + * is smaller than the readBuffer size. + */ + expectedBytesReceived += 1; + } // -------------------------------------------------------------------- // Assertions @@ -271,8 +297,14 @@ public void testAbfsHttpResponseStatistics() throws IOException { in = fs.open(getResponsePath); // Network stats calculation: For Creating AbfsInputStream: // 1 GetFileStatus for file size = 1 connection and 1 get response - expectedConnectionsMade++; - expectedGetResponses++; + if (!getConfiguration().isInputStreamLazyOptimizationEnabled()) { + /* + * If head optimization is enabled, getFileStatus is not called. Hence, there + * would be no connection made and get response for the operation 'open'. + */ + expectedConnectionsMade++; + expectedGetResponses++; + } // -------------------------------------------------------------------- // Operation: Read diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java index c59009dd0feb4..9315a49422896 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs; import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.UUID; @@ -326,7 +327,15 @@ private void executeOp(Path reqPath, AzureBlobFileSystem fs, fs.open(reqPath); break; case Open: - fs.open(reqPath); + try(InputStream is = fs.open(reqPath)) { + if (getConfiguration().isInputStreamLazyOptimizationEnabled()) { + try { + is.read(); + } catch (IOException ex) { + throw (IOException) ex.getCause(); + } + } + } break; case DeletePath: fs.delete(reqPath, false); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index fd5d312176321..1d22ce4928c09 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -78,6 +79,34 @@ public ITestAzureBlobFileSystemDelete() throws Exception { super(); } + + @Override + public AzureBlobFileSystem getFileSystem() throws IOException { + if (!getConfiguration().isInputStreamLazyOptimizationEnabled()) { + return super.getFileSystem(); + } + try { + AzureBlobFileSystem fs = super.getFileSystem(); + AzureBlobFileSystem spiedFs = Mockito.spy(fs); + Mockito.doAnswer(answer -> { + Path path = (Path) answer.getArgument(0); + FileStatus status = fs.getFileStatus(path); + if (status.isDirectory()) { + throw new FileNotFoundException(path.toString()); + } + return fs.openFile(path) + .withFileStatus(status) + .build() + .join(); + }).when(spiedFs).open(Mockito.any(Path.class)); + + Mockito.doNothing().when(spiedFs).close(); + return spiedFs; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + @Test public void testDeleteRoot() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); @@ -225,7 +254,8 @@ public void testDeleteIdempotency() throws Exception { @Test public void testDeleteIdempotencyTriggerHttp404() throws Exception { - final AzureBlobFileSystem fs = getFileSystem(); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + getRawConfiguration()); AbfsClient client = ITestAbfsClient.createTestClientFromCurrentContext( fs.getAbfsStore().getClient(), this.getConfiguration()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index 940d56fecb438..57a1a59e8396b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -192,7 +192,11 @@ public void testSkipBounds() throws Exception { Path testPath = path(TEST_FILE_PREFIX + "_testSkipBounds"); long testFileLength = assumeHugeFileExists(testPath); - try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) { + try (FSDataInputStream inputStream = this.getFileSystem() + .openFile(testPath) + .withFileStatus(getFileSystem().getFileStatus(testPath)) + .build() + .get()) { ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); long skipped = inputStream.skip(-1); @@ -232,8 +236,13 @@ public Long call() throws Exception { public void testValidateSeekBounds() throws Exception { Path testPath = path(TEST_FILE_PREFIX + "_testValidateSeekBounds"); long testFileLength = assumeHugeFileExists(testPath); + FileStatus status = getFileSystem().getFileStatus(testPath); - try (FSDataInputStream inputStream = this.getFileSystem().open(testPath)) { + try (FSDataInputStream inputStream = this.getFileSystem() + .openFile(testPath) + .withFileStatus(status) + .build() + .get()) { ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); inputStream.seek(0); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java index 1319ea44c7c07..19e6be24e28ad 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/AbfsFileSystemContract.java @@ -18,12 +18,26 @@ package org.apache.hadoop.fs.azurebfs.contract; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import org.mockito.Mockito; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.contract.AbstractBondedFSContract; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM; + /** * Azure BlobFileSystem Contract. Test paths are created using any maven fork * identifier, if defined. This guarantees paths unique to tests @@ -61,4 +75,59 @@ public String toString() { sb.append('}'); return sb.toString(); } + + @Override + public FileSystem getTestFileSystem() throws IOException { + final FileSystem fileSystem = super.getTestFileSystem(); + if (!getConf().getBoolean( + FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED, DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM)) { + return fileSystem; + } + try { + AzureBlobFileSystem fs = (AzureBlobFileSystem) fileSystem; + AzureBlobFileSystem spiedFs = Mockito.spy(fs); + Mockito.doAnswer(answer -> { + Path path = (Path) answer.getArgument(0); + FileStatus status = fs.getFileStatus(path); + + try { + return fs.openFile(path) + .withFileStatus(status) + .build() + .join(); + } catch (CompletionException ex) { + throw ex.getCause(); + } + }).when(spiedFs).open(Mockito.any(Path.class)); + + Mockito.doAnswer(answer -> { + Path path = (Path) answer.getArgument(0); + try { + FileStatus fileStatus = fs.getFileStatus(path); + FutureDataInputStreamBuilder builder = Mockito.spy( + fs.openFile(path).withFileStatus(fileStatus)); + Mockito.doAnswer(builderBuild -> { + return fs.openFile(path).withFileStatus(fileStatus).build(); + }).when(builder).build(); + return builder; + } catch (IOException ex) { + CompletableFuture future + = new CompletableFuture<>(); + future.completeExceptionally(ex); + + FutureDataInputStreamBuilder builder = Mockito.spy(fs.openFile(path)); + Mockito.doAnswer(futureAnswer -> { + futureAnswer.callRealMethod(); + return future; + }).when(builder).build(); + return builder; + } + }).when(spiedFs).openFile(Mockito.any(Path.class)); + + Mockito.doNothing().when(spiedFs).close(); + return spiedFs; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index f7fe5039799d7..347f2b3c3d103 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; import org.apache.hadoop.fs.contract.AbstractContractSeekTest; @@ -127,27 +128,65 @@ public void testSeekAndReadWithReadAhead() throws IOException { assertDataAtPos(newSeek, (byte) in.read()); assertSeekBufferStats(1, streamStatistics.getSeekInBuffer()); remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); - assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, - remoteReadOperationsNewVal); + boolean isPrefetchSwitchedOffForFirstRead + = !((AzureBlobFileSystem) getFileSystem()).getAbfsStore() + .getAbfsConfiguration() + .isPrefetchOnFirstReadEnabled(); + /* + * If prefetchReadAheadOnFirstRead is switched off, there will be no + * prefetch on the first read call. So the process would be having only data + * 0 to readAheadRange in memory. There would not have been any prefetch + * for [readAheadRange, 2*readAheadRange] range. So, for the read of [readAheadRange, + * readAheadRange + 1] range, there would be 2 remote read operation, as the + * prefetch logic will get switched on. + */ + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal + ( + isPrefetchSwitchedOffForFirstRead + ? 2 : 0), + remoteReadOperationsNewVal); remoteReadOperationsOldVal = remoteReadOperationsNewVal; + if (isPrefetchSwitchedOffForFirstRead) { + newSeek = 2 * inStream.getReadAheadRange() + 1; + /* + * This read will be getting data for [2 * readAheadRange, 2 * readAheadRange + 1] from + * readAheadBuffers. But the read will execute a prefetch for [3 * readAheadRange, 4 * readAheadRange - 1]. + */ + inStream.seek(newSeek); + assertGetPosition(newSeek, in.getPos()); + assertDataAtPos(newSeek, (byte) in.read()); + assertSeekBufferStats(1, streamStatistics.getSeekInBuffer()); + remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal + 1, + remoteReadOperationsNewVal); + remoteReadOperationsOldVal = remoteReadOperationsNewVal; + } + // Seeking just after read ahead range. Read from buffer. newSeek = inStream.getReadAheadRange() + 1; + /* + * If the testcase for switched off prefetchReadAheadOnFirstRead is run, then + * the memory would have data from [2 * readAheadRange, 4 * ReadAheadRange - 1]. + * So, when read has to be done on [readAheadRange + 1, readAheadRange + 2], + * it would be a new read and not from already buffered data. + */ + int seekCounter = isPrefetchSwitchedOffForFirstRead ? 1 : 2; in.seek(newSeek); assertGetPosition(newSeek, in.getPos()); assertDataAtPos(newSeek, (byte) in.read()); - assertSeekBufferStats(2, streamStatistics.getSeekInBuffer()); + assertSeekBufferStats(seekCounter, streamStatistics.getSeekInBuffer()); remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); - assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, + assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal + (isPrefetchSwitchedOffForFirstRead ? 1:0), remoteReadOperationsNewVal); remoteReadOperationsOldVal = remoteReadOperationsNewVal; // Seeking just 10 more bytes such that data is read from buffer. newSeek += 10; + seekCounter++; in.seek(newSeek); assertGetPosition(newSeek, in.getPos()); assertDataAtPos(newSeek, (byte) in.read()); - assertSeekBufferStats(3, streamStatistics.getSeekInBuffer()); + assertSeekBufferStats(seekCounter, streamStatistics.getSeekInBuffer()); remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal); @@ -158,7 +197,7 @@ public void testSeekAndReadWithReadAhead() throws IOException { in.seek(newSeek); assertGetPosition(newSeek, in.getPos()); assertDataAtPos(newSeek, (byte) in.read()); - assertSeekBufferStats(3, streamStatistics.getSeekInBuffer()); + assertSeekBufferStats(seekCounter, streamStatistics.getSeekInBuffer()); remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); assertIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal); @@ -166,10 +205,11 @@ public void testSeekAndReadWithReadAhead() throws IOException { // Seeking just 10 more bytes such that data is read from buffer. newSeek += 10; + seekCounter++; in.seek(newSeek); assertGetPosition(newSeek, in.getPos()); assertDataAtPos(newSeek, (byte) in.read()); - assertSeekBufferStats(4, streamStatistics.getSeekInBuffer()); + assertSeekBufferStats(seekCounter, streamStatistics.getSeekInBuffer()); remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); assertNoIncrementInRemoteReadOps(remoteReadOperationsOldVal, remoteReadOperationsNewVal); @@ -184,7 +224,7 @@ public void testSeekAndReadWithReadAhead() throws IOException { // Adding one as one byte is already read // after the last seek is done. assertGetPosition(oldSeek + 1, in.getPos()); - assertSeekBufferStats(4, streamStatistics.getSeekInBuffer()); + assertSeekBufferStats(seekCounter, streamStatistics.getSeekInBuffer()); assertDatasetEquals(newSeek, "Read across read ahead ", bytes, bytes.length); remoteReadOperationsNewVal = streamStatistics.getRemoteReadOperations(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index d14ac05d5f5aa..e2b84bc32cf7f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -18,11 +18,16 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; @@ -31,8 +36,12 @@ import org.assertj.core.api.Assertions; import org.junit.Test; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamTestUtils.HUNDRED; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -123,12 +132,39 @@ public void testAzureBlobFileSystemBackReferenceInInputStream() } } + @Test + public void testDirectoryReadWithHeadOptimization() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.setBoolean( + FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED, true); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + Path path = new Path("/testPath"); + fs.mkdirs(path); + try (FSDataInputStream in = fs.open(path)) { + intercept(FileNotFoundException.class, () -> in.read()); + } + } + + @Test + public void testInvalidPathReadWithHeadOptimization() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.setBoolean( + FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED, true); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + Path path = new Path("/testPath"); + try (FSDataInputStream in = fs.open(path)) { + intercept(FileNotFoundException.class, () -> in.read()); + } + } + private void testExceptionInOptimization(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, final byte[] fileContent) - throws IOException { + throws IOException, ExecutionException, InterruptedException { - FSDataInputStream iStream = fs.open(testFilePath); + FutureDataInputStreamBuilder builder = fs.openFile(testFilePath); + builder.withFileStatus(fs.getFileStatus(testFilePath)); + FSDataInputStream iStream = builder.build().get(); try { AbfsInputStream abfsInputStream = (AbfsInputStream) iStream .getWrappedStream(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java index c7c9da94ab2ed..0abf167d8cf71 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java @@ -48,7 +48,12 @@ import static java.lang.Math.max; import static java.lang.Math.min; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FALSE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_OPTIMIZE_FOOTER_READ; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamTestUtils.HUNDRED; @@ -330,12 +335,19 @@ private void seekReadAndTest(final AzureBlobFileSystem fs, byte[] buffer = new byte[length]; long bytesRead = iStream.read(buffer, 0, length); - long footerStart = max(0, - actualContentLength - AbfsInputStream.FOOTER_SIZE); - boolean optimizationOn = - conf.optimizeFooterRead() && seekPos >= footerStart; + final boolean optimizationOn; + long actualLength; + + if (getConfiguration().isInputStreamLazyOptimizationEnabled()) { + optimizationOn = conf.optimizeFooterRead() && length <= AbfsInputStream.FOOTER_SIZE; + } else { + long footerStart = max(0, + actualContentLength - AbfsInputStream.FOOTER_SIZE); + optimizationOn = + conf.optimizeFooterRead() && seekPos >= footerStart; + } + actualLength = length; - long actualLength = length; if (seekPos + length > actualContentLength) { long delta = seekPos + length - actualContentLength; actualLength = length - delta; @@ -343,7 +355,68 @@ private void seekReadAndTest(final AzureBlobFileSystem fs, long expectedLimit; long expectedBCursor; long expectedFCursor; - if (optimizationOn) { + if (getConfiguration().isInputStreamLazyOptimizationEnabled() + && optimizationOn) { + // For file smaller than the footerReadBufferSize. + if (seekPos + actualLength <= footerReadBufferSize) { + /* + * If the intended length of read is such that the intended read end + * will go outside the contentLength range. Now, on the footer read, + * it would be able to read only the data allowed in contentLength range. + * + * If the intended length of read is such that the intended read will + * end be in the contentLength range. Then, the read would be allowed + * to read whole required data. + */ + if (seekPos + length > actualContentLength) { + /* + * The ending of the required data would be more than the contentLength. + * So, the footer read would be able to read only the data allowed in + * contentLength range. This would be represented in the expectedLimit + * of the buffer read. + */ + long footerReadStart = max(0, + seekPos + length - footerReadBufferSize); + /* + * Amount of data filled in buffer is equal to number of bytes from + * footerReadStart and contentLength. + */ + expectedLimit = actualContentLength - footerReadStart; + /* + * Pointer in buffer which represent the starting of actual required data + * is equal to the bytes between footerReadStart and seekPos. + */ + expectedBCursor = seekPos - footerReadStart; + } else { + /* + * This would execute a read from the start of the file to (seekPos + actualLength) + * The reason of this is that footerReadBufferSize is bigger than + * (seekPos + actualLength). + */ + expectedLimit = seekPos + actualLength; + expectedBCursor = seekPos; + } + } else { + // FileSize is bigger than footerReadBufferSize. + + if (seekPos + length > actualContentLength) { + // Partial data of footerReadBufferSize range is read as part of the + // required data is out of contentLength range. + long footerReadStart = seekPos + length - footerReadBufferSize; + expectedLimit = actualContentLength - footerReadStart; + expectedBCursor = seekPos - footerReadStart; + } else { + // full data of footerReadBufferSize range is read. + expectedLimit = footerReadBufferSize; + expectedBCursor = footerReadBufferSize - actualLength; + } + } + long bytesRemaining = expectedLimit - expectedBCursor; + long bytesToRead = min(actualLength, bytesRemaining); + expectedBCursor += bytesToRead; + expectedFCursor = seekPos + actualLength; + } else if (!getConfiguration().isInputStreamLazyOptimizationEnabled() + && optimizationOn) { if (actualContentLength <= footerReadBufferSize) { expectedLimit = actualContentLength; expectedBCursor = seekPos + actualLength; @@ -374,7 +447,11 @@ private void seekReadAndTest(final AzureBlobFileSystem fs, // Verify data read to AbfsInputStream buffer int from = seekPos; if (optimizationOn) { - from = (int) max(0, actualContentLength - footerReadBufferSize); + if (!getConfiguration().isInputStreamLazyOptimizationEnabled()) { + from = (int) max(0, actualContentLength - footerReadBufferSize); + } else { + from = (int) (expectedFCursor - expectedLimit); + } } abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(), abfsInputStream.getBuffer(), testFilePath); @@ -417,14 +494,14 @@ private void validatePartialReadWithNoData(final AzureBlobFileSystem spiedFs, validatePartialReadWithNoData(spiedFs, testFilePath, fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, - fileContent, footerReadBufferSize, readBufferSize); + fileContent, footerReadBufferSize, fileSize, readBufferSize); } } } private void validatePartialReadWithNoData(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, - final byte[] fileContent, int footerReadBufferSize, final int readBufferSize) throws IOException { + final byte[] fileContent, int footerReadBufferSize, final int fileSize, final int readBufferSize) throws IOException { FSDataInputStream iStream = fs.open(testFilePath); try { AbfsInputStream abfsInputStream = (AbfsInputStream) iStream @@ -437,6 +514,7 @@ private void validatePartialReadWithNoData(final FileSystem fs, doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream) .readRemote(anyLong(), any(), anyInt(), anyInt(), any(TracingContext.class)); + doReturn((long) fileSize).when(abfsInputStream).getContentLength(); iStream = new FSDataInputStream(abfsInputStream); abfsInputStreamTestUtils.seek(iStream, seekPos); @@ -444,7 +522,8 @@ private void validatePartialReadWithNoData(final FileSystem fs, byte[] buffer = new byte[length]; int bytesRead = iStream.read(buffer, 0, length); assertEquals(length, bytesRead); - abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, seekPos, length, buffer, testFilePath); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, seekPos, + length, buffer, testFilePath); assertEquals(fileContent.length, abfsInputStream.getFCursor()); assertEquals(length, abfsInputStream.getBCursor()); assertTrue(abfsInputStream.getLimit() >= length); @@ -546,7 +625,9 @@ public void testFooterReadBufferSizeConfiguration() throws Exception { // Verify that value set in config is used if builder is not used AbfsConfiguration spiedConfig = fs.getAbfsStore().getAbfsConfiguration(); - Mockito.doReturn(footerReadBufferSizeConfig).when(spiedConfig).getFooterReadBufferSize(); + Mockito.doReturn(footerReadBufferSizeConfig) + .when(spiedConfig) + .getFooterReadBufferSize(); iStream = fs.open(testFilePath); verifyConfigValueInStream(iStream, footerReadBufferSizeConfig); @@ -560,19 +641,63 @@ public void testFooterReadBufferSizeConfiguration() throws Exception { // Verify that when builder is used value set in parameters is used // even if config is set - Mockito.doReturn(footerReadBufferSizeConfig).when(spiedConfig).getFooterReadBufferSize(); + Mockito.doReturn(footerReadBufferSizeConfig) + .when(spiedConfig) + .getFooterReadBufferSize(); iStream = builder.build().get(); verifyConfigValueInStream(iStream, footerReadBufferSizeBuilder); // Verify that when the builder is used and parameter in builder is not set, // the value set in configuration is used - Mockito.doReturn(footerReadBufferSizeConfig).when(spiedConfig).getFooterReadBufferSize(); + Mockito.doReturn(footerReadBufferSizeConfig) + .when(spiedConfig) + .getFooterReadBufferSize(); builder = fs.openFile(testFilePath); iStream = builder.build().get(); verifyConfigValueInStream(iStream, footerReadBufferSizeConfig); } } + @Test + public void testHeadOptimizationPerformingOutOfRangeRead() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED, TRUE); + configuration.set(AZURE_READ_SMALL_FILES_COMPLETELY, FALSE); + configuration.set(AZURE_READ_OPTIMIZE_FOOTER_READ, TRUE); + + try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration)) { + int footerBufferRead = getConfiguration().getFooterReadBufferSize(); + Path testFilePath = createPathAndFileWithContent(fs, 0, footerBufferRead); + try (FSDataInputStream iStream = fs.open(testFilePath)) { + iStream.seek(2 * footerBufferRead - AbfsInputStream.FOOTER_SIZE + 1); + byte[] buffer = new byte[AbfsInputStream.FOOTER_SIZE]; + int bytesRead = iStream.read(buffer, 0, AbfsInputStream.FOOTER_SIZE); + assertEquals(-1, bytesRead); + } + + try (FSDataInputStream iStream = fs.open(testFilePath)) { + iStream.seek(footerBufferRead + AbfsInputStream.FOOTER_SIZE); + byte[] buffer = new byte[AbfsInputStream.FOOTER_SIZE]; + int bytesRead = iStream.read(buffer, 0, AbfsInputStream.FOOTER_SIZE); + assertEquals(-1, bytesRead); + assertEquals(footerBufferRead, iStream.getPos()); + + int expectedReadLen = footerBufferRead - (2 * AbfsInputStream.FOOTER_SIZE); + iStream.seek(2 * AbfsInputStream.FOOTER_SIZE); + buffer = new byte[expectedReadLen]; + bytesRead = iStream.read(buffer, 0, expectedReadLen); + assertEquals(expectedReadLen, bytesRead); + + AbfsInputStream abfsInputStream + = (AbfsInputStream) iStream.getWrappedStream(); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); + assertEquals(1, streamStatistics.getSeekInBuffer()); + assertEquals(1, streamStatistics.getRemoteReadOperations()); + } + } + } + private void verifyConfigValueInStream(final FSDataInputStream inputStream, final int expectedValue) { AbfsInputStream stream = (AbfsInputStream) inputStream.getWrappedStream(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java index 64fac9ca94ed8..7197fbe2abdf9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java @@ -22,7 +22,9 @@ import java.util.Map; import org.junit.Test; +import org.mockito.Mockito; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; @@ -31,6 +33,12 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import static java.lang.Math.min; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FALSE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_OPTIMIZE_FOOTER_READ; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -218,14 +226,29 @@ private void seekReadAndTest(FileSystem fs, Path testFilePath, int seekPos, final int readBufferSize = conf.getReadBufferSize(); final int fileContentLength = fileContent.length; - final boolean smallFile = fileContentLength <= readBufferSize; + final boolean smallFile; + final boolean headOptimization = getConfiguration().isInputStreamLazyOptimizationEnabled(); + + if (headOptimization) { + smallFile = ((seekPos + length) <= readBufferSize); + } else { + smallFile = fileContentLength <= readBufferSize; + } int expectedLimit, expectedFCursor; int expectedBCursor; if (conf.readSmallFilesCompletely() && smallFile) { abfsInputStreamTestUtils.assertAbfsInputStreamBufferNotEqualToContentStartSubsequence(fileContent, abfsInputStream, conf, testFilePath); - expectedFCursor = fileContentLength; - expectedLimit = fileContentLength; - expectedBCursor = seekPos + length; + /* + * If head optimization is enabled. The stream can do full read file optimization on the first read if + * the seekPos is less than readBufferSize and the length is such that (seekPos + length) < readBufferSize. + * Since it is unaware of the contentLength, it would try to read the full buffer size. + * + * In case of the head optimization is enabled, and readBufferSize < fileContentLength, the stream will + * read only the readBuffer and would set internal pointers to the end of readBufferLength. + */ + expectedFCursor = min(readBufferSize, fileContentLength); + expectedLimit = min(readBufferSize, fileContentLength); + expectedBCursor = min(readBufferSize, seekPos + length); } else { if ((seekPos == 0)) { abfsInputStreamTestUtils.assertAbfsInputStreamBufferNotEqualToContentStartSubsequence(fileContent, abfsInputStream, conf, testFilePath); @@ -275,12 +298,16 @@ private void partialReadWithNoData(final FileSystem fs, AbfsInputStream abfsInputStream = (AbfsInputStream) iStream .getWrappedStream(); abfsInputStream = spy(abfsInputStream); - doReturn(10) - .doReturn(10) - .doCallRealMethod() - .when(abfsInputStream) - .readRemote(anyLong(), any(), anyInt(), anyInt(), - any(TracingContext.class)); + Mockito.doReturn((long) fileContent.length).when(abfsInputStream).getContentLength(); + int[] readRemoteIteration = {0}; + Mockito.doAnswer(answer -> { + readRemoteIteration[0]++; + if (readRemoteIteration[0] <= 2) { + return 10; + } + return answer.callRealMethod(); + }).when(abfsInputStream).readRemote(anyLong(), any(), anyInt(), anyInt(), + any(TracingContext.class)); iStream = new FSDataInputStream(abfsInputStream); abfsInputStreamTestUtils.seek(iStream, seekPos); @@ -355,6 +382,144 @@ private void partialReadWithSomeData(final FileSystem fs, } } + /** + * Test read full file optimization getting executed when there is head + * optimization enabled and the file is smaller than the buffer size. + * + * The read call sent to inputStream is for full buffer size, assert that + * the inputStream correctly fills the application buffer, and also fills the + * inputStream buffer with the file content. Any next read should be catered + * from the inputStream buffer. + */ + @Test + public void testHeadOptimizationOnFileLessThanBufferSize() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED, TRUE); + configuration.set(AZURE_READ_SMALL_FILES_COMPLETELY, TRUE); + try (FileSystem fs = FileSystem.newInstance(configuration)) { + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray( + readBufferSize / 2); + Path path = abfsInputStreamTestUtils.createFileWithContent(fs, + methodName.getMethodName(), fileContent); + + try (FSDataInputStream is = fs.open(path)) { + is.seek(readBufferSize / 4); + byte[] buffer = new byte[readBufferSize / 2]; + int readLength = is.read(buffer, 0, readBufferSize / 2); + assertEquals(readLength, readBufferSize / 4); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, + readBufferSize / 4, readLength, + buffer, path); + + is.seek(0); + readLength = is.read(buffer, 0, readBufferSize / 2); + assertEquals(readLength, readBufferSize / 2); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, 0, + readLength, buffer, path); + + AbfsInputStream abfsInputStream + = (AbfsInputStream) is.getWrappedStream(); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); + assertEquals(1, streamStatistics.getSeekInBuffer()); + assertEquals(1, streamStatistics.getRemoteReadOperations()); + } + } + } + + /** + * Test read full file optimization getting executed when there is head optimization + * is there on a file which has more contentLength than the readBufferSize, but the + * read call results final fcursor lesser than readBufferLength. + */ + @Test + public void testHeadOptimizationOnFileBiggerThanBufferSize() + throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED, TRUE); + configuration.set(AZURE_READ_SMALL_FILES_COMPLETELY, TRUE); + try (FileSystem fs = FileSystem.newInstance(configuration)) { + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray( + readBufferSize); + Path path = abfsInputStreamTestUtils.createFileWithContent(fs, + methodName.getMethodName(), fileContent); + + try (FSDataInputStream is = fs.open(path)) { + is.seek(readBufferSize / 4); + byte[] buffer = new byte[readBufferSize / 2]; + int readLength = is.read(buffer, 0, readBufferSize / 2); + assertEquals(readLength, readBufferSize / 2); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, + readBufferSize / 4, readLength, buffer, path); + + readLength = is.read(buffer, 0, readBufferSize / 4); + assertEquals(readLength, readBufferSize / 4); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, + 3 * readBufferSize / 4, readLength, buffer, path); + + AbfsInputStream abfsInputStream + = (AbfsInputStream) is.getWrappedStream(); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); + assertEquals(1, streamStatistics.getSeekInBuffer()); + assertEquals(1, streamStatistics.getRemoteReadOperations()); + } + + try (FSDataInputStream is = fs.open(path)) { + is.seek(readBufferSize / 2); + byte[] buffer = new byte[readBufferSize]; + int readLength = is.read(buffer, 0, readBufferSize / 2); + + assertEquals(readLength, readBufferSize / 2); + abfsInputStreamTestUtils.assertContentReadCorrectly(fileContent, + readBufferSize / 2, readLength, buffer, path); + + byte[] zeroBuffer = new byte[readBufferSize / 2]; + abfsInputStreamTestUtils.assertContentReadCorrectly(buffer, + readBufferSize / 2, readBufferSize / 2, zeroBuffer, path); + } + } + } + + @Test + public void testHeadOptimizationPerformingOutOfRangeRead() + throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(FS_AZURE_INPUT_STREAM_LAZY_OPEN_OPTIMIZATION_ENABLED, + TRUE); + configuration.set(AZURE_READ_SMALL_FILES_COMPLETELY, TRUE); + configuration.set(AZURE_READ_OPTIMIZE_FOOTER_READ, FALSE); + + try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + configuration)) { + int readBufferSize = getConfiguration().getReadBufferSize(); + byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray( + readBufferSize / 2); + Path path = abfsInputStreamTestUtils.createFileWithContent(fs, + methodName.getMethodName(), fileContent); + + try (FSDataInputStream is = fs.open(path)) { + is.seek(readBufferSize / 2 + 1); + byte[] buffer = new byte[readBufferSize / 2]; + int readLength = is.read(buffer, 0, readBufferSize / 2 - 1); + assertEquals(readLength, -1); + + is.seek(0); + readLength = is.read(buffer, 0, readBufferSize / 2); + assertEquals(readLength, readBufferSize / 2); + + AbfsInputStream abfsInputStream + = (AbfsInputStream) is.getWrappedStream(); + AbfsInputStreamStatisticsImpl streamStatistics = + (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); + assertEquals(1, streamStatistics.getSeekInBuffer()); + assertEquals(1, streamStatistics.getRemoteReadOperations()); + } + } + } + private enum SeekTo {BEGIN, MIDDLE, END} } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPositionedRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPositionedRead.java index 25f33db1cae9e..9c78860d6d1ca 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPositionedRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPositionedRead.java @@ -73,18 +73,43 @@ public void testPositionedRead() throws IOException { // Read only 10 bytes from offset 0. But by default it will do the seek // and read where the entire 100 bytes get read into the // AbfsInputStream buffer. + + boolean readOnlyBytesToReadData = + getConfiguration().isInputStreamLazyOptimizationEnabled() + && !getConfiguration().readSmallFilesCompletely() + && getConfiguration().optimizeFooterRead(); + /* + * If the head optimization and footer optimization is enabled, and readSmallFile + * optimization is disabled, the first read would read the given number of bytes + * only in the first read call. Reason being, due to the head optimization it would not + * know the contentLength and because the byteLength required is less than the + * footerReadThreshold, it would read from the starting of the file to the given + * byteLength only. + * + * At this point, the inputStream is at position 0 and the read request from + * application is 10 Byte. If the read full-file optimization is enabled, + * the inputStream would attempt to read the first readBuffer block + * from the file, which would read the whole file as the fileContentLength + * is smaller than the readBuffer size. + */ Assertions .assertThat(Arrays.copyOfRange( ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0, - TEST_FILE_DATA_SIZE)) + readOnlyBytesToReadData ? bytesToRead : TEST_FILE_DATA_SIZE)) .describedAs( "AbfsInputStream pread did not read more data into its buffer") - .containsExactly(data); + .containsExactly(readOnlyBytesToReadData ? Arrays.copyOfRange(data, 0, bytesToRead) + : data); // Check statistics assertStatistics(inputStream.getIOStatistics(), bytesToRead, 1, 1, - TEST_FILE_DATA_SIZE); + readOnlyBytesToReadData ? bytesToRead : TEST_FILE_DATA_SIZE); readPos = 50; + /* + * In case of the head optimization enabled, the second read would read the remaining bytes of file + * after the given readPos (this would become read of TEST_FILE_DATA_SIZE - readPos). + * In case of the head optimization disabled, the first read would have read the entire file. + */ Assertions .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead)) .describedAs( @@ -95,8 +120,11 @@ public void testPositionedRead() throws IOException { .containsExactly( Arrays.copyOfRange(data, readPos, readPos + bytesToRead)); // Check statistics - assertStatistics(inputStream.getIOStatistics(), 2 * bytesToRead, 2, 1, - TEST_FILE_DATA_SIZE); + assertStatistics(inputStream.getIOStatistics(), 2 * bytesToRead, 2, + readOnlyBytesToReadData ? 2 : 1, + readOnlyBytesToReadData + ? TEST_FILE_DATA_SIZE - readPos + bytesToRead + : TEST_FILE_DATA_SIZE); // Did positioned read from pos 0 and then 50 but the stream pos should // remain at 0. Assertions.assertThat(inputStream.getPos()) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index e4ed9881ffa4f..7d899ef921945 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -124,7 +124,9 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, null, FORWARD_SLASH + fileName, THREE_KB, - inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB), + inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10) + .withReadAheadBlockSize(ONE_KB) + .withPrefetchTriggerOnFirstRead(true), "eTag", getTestTracingContext(null, false)); @@ -250,11 +252,14 @@ private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class), any( ContextEncryptionAdapter.class)); - // verify GetPathStatus invoked when FileStatus not provided + // verify GetPathStatus invoked when FileStatus not provided and the head optimization is disabled abfsStore.openFileForRead(testFile, Optional.empty(), null, tracingContext); - verify(mockClient, times(1).description( + verify(mockClient, times( + getConfiguration().isInputStreamLazyOptimizationEnabled() + ? 0 + : 1).description( "GetPathStatus should be invoked when FileStatus not provided")) .getPathStatus(anyString(), anyBoolean(), any(TracingContext.class), nullable( ContextEncryptionAdapter.class));