diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 1242122f03015..3db1565c7057e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -138,6 +138,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FOOTER_READ_BUFFER_SIZE) private int footerReadBufferSize; + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = FS_AZURE_BUFFERED_PREAD_DISABLE, + DefaultValue = DEFAULT_BUFFERED_PREAD_DISABLE) + private boolean isBufferedPReadDisabled; + @BooleanConfigurationValidatorAnnotation( ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED) @@ -953,6 +958,14 @@ public int getFooterReadBufferSize() { return this.footerReadBufferSize; } + /** + * Returns whether the buffered pread is disabled. + * @return true if buffered pread is disabled, false otherwise. + */ + public boolean isBufferedPReadDisabled() { + return this.isBufferedPReadDisabled; + } + public int getReadBufferSize() { return this.readBufferSize; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 439b7626a86a7..2732c0ed8fb31 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -945,8 +945,9 @@ contentLength, populateAbfsInputStreamContext( private AbfsInputStreamContext populateAbfsInputStreamContext( Optional options, ContextEncryptionAdapter contextEncryptionAdapter) { boolean bufferedPreadDisabled = options - .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false)) - .orElse(false); + .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, + getAbfsConfiguration().isBufferedPReadDisabled())) + .orElse(getAbfsConfiguration().isBufferedPReadDisabled()); int footerReadBufferSize = options.map(c -> c.getInt( AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize())) .orElse(getAbfsConfiguration().getFooterReadBufferSize()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 8bee2c72b6b65..fe4991c9582d5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -174,6 +174,8 @@ public final class AbfsHttpConstants { public static final char CHAR_STAR = '*'; public static final char CHAR_PLUS = '+'; + public static final int SPLIT_NO_LIMIT = -1; + /** * Specifies the version of the REST protocol used for processing the request. * Versions should be added in enum list in ascending chronological order. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 824a4c9701ea0..8bcd55aee8e35 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -75,6 +75,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false; public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = true; public static final int DEFAULT_FOOTER_READ_BUFFER_SIZE = 512 * ONE_KB; + public static final boolean DEFAULT_BUFFERED_PREAD_DISABLE = false; public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false; public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB; public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java new file mode 100644 index 0000000000000..332a5a5ac56e2 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.constants; + +/** + * Enumeration for different types of read operations triggered by AbfsInputStream. + */ +public enum ReadType { + /** + * Synchronous read from the storage service. No optimization is being applied. + */ + DIRECT_READ("DR"), + /** + * Synchronous read from the storage service where optimization were considered but found disabled. + */ + NORMAL_READ("NR"), + /** + * Asynchronous read from the storage service for filling up cache. + */ + PREFETCH_READ("PR"), + /** + * Synchronous read from the storage service when nothing was found in cache. + */ + MISSEDCACHE_READ("MR"), + /** + * Synchronous read from the storage service for reading the footer of a file. + * Only triggered when footer read optimization kicks in. + */ + FOOTER_READ("FR"), + /** + * Synchronous read from the storage service for reading a small file fully. + * Only triggered when small file read optimization kicks in. + */ + SMALLFILE_READ("SR"), + /** + * None of the above read types were applicable. + */ + UNKNOWN_READ("UR"); + + private final String readType; + + ReadType(String readType) { + this.readType = readType; + } + + /** + * Get the read type as a string. + * + * @return the read type string + */ + @Override + public String toString() { + return readType; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 3dc7f88e52911..38b49603fbb00 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; @@ -165,6 +166,7 @@ public AbfsInputStream( this.tracingContext = new TracingContext(tracingContext); this.tracingContext.setOperation(FSOperationType.READ); this.tracingContext.setStreamID(inputStreamId); + this.tracingContext.setReadType(ReadType.UNKNOWN_READ); this.context = abfsInputStreamContext; readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); if (abfsReadFooterMetrics != null) { @@ -227,7 +229,9 @@ public int read(long position, byte[] buffer, int offset, int length) if (streamStatistics != null) { streamStatistics.readOperationStarted(); } - int bytesRead = readRemote(position, buffer, offset, length, tracingContext); + TracingContext tc = new TracingContext(tracingContext); + tc.setReadType(ReadType.DIRECT_READ); + int bytesRead = readRemote(position, buffer, offset, length, tc); if (statistics != null) { statistics.incrementBytesRead(bytesRead); } @@ -345,6 +349,8 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO buffer = new byte[bufferSize]; } + // Reset Read Type back to normal and set again based on code flow. + tracingContext.setReadType(ReadType.NORMAL_READ); if (alwaysReadBufferSize) { bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); } else { @@ -385,6 +391,7 @@ private int readFileCompletely(final byte[] b, final int off, final int len) // data need to be copied to user buffer from index bCursor, bCursor has // to be the current fCusor bCursor = (int) fCursor; + tracingContext.setReadType(ReadType.SMALLFILE_READ); return optimisedRead(b, off, len, 0, contentLength); } @@ -405,6 +412,7 @@ private int readLastBlock(final byte[] b, final int off, final int len) bCursor = (int) (fCursor - lastBlockStart); // 0 if contentlength is < buffersize long actualLenToRead = min(footerReadSize, contentLength); + tracingContext.setReadType(ReadType.FOOTER_READ); return optimisedRead(b, off, len, lastBlockStart, actualLenToRead); } @@ -520,6 +528,7 @@ private int readInternal(final long position, final byte[] b, final int offset, LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads); TracingContext readAheadTracingContext = new TracingContext(tracingContext); readAheadTracingContext.setPrimaryRequestID(); + readAheadTracingContext.setReadType(ReadType.PREFETCH_READ); while (numReadAheads > 0 && nextOffset < contentLength) { LOG.debug("issuing read ahead requestedOffset = {} requested size {}", nextOffset, nextSize); @@ -544,7 +553,9 @@ private int readInternal(final long position, final byte[] b, final int offset, } // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length, new TracingContext(tracingContext)); + TracingContext tc = new TracingContext(tracingContext); + tc.setReadType(ReadType.MISSEDCACHE_READ); + receivedBytes = readRemote(position, b, offset, length, tc); return receivedBytes; } else { LOG.debug("read ahead disabled, reading remote"); @@ -578,6 +589,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t streamStatistics.remoteReadOperation(); } LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); + tracingContext.setPosition(String.valueOf(position)); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), contextEncryptionAdapter, tracingContext); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java index d93bafb676cb3..ecc4a55090fa1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobDeleteHandler.java @@ -126,7 +126,7 @@ public boolean execute() throws AzureBlobFileSystemException { */ deleted = recursive ? safeDelete(path) : deleteInternal(path); } finally { - tracingContext.setOperatedBlobCount(null); + tracingContext.setOperatedBlobCount(0); } if (deleteCount.get() == 0) { /* diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java index 1f22d049ecea9..069c705cca68e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java @@ -204,7 +204,7 @@ private boolean finalSrcRename() throws AzureBlobFileSystemException { } throw e; } finally { - tracingContext.setOperatedBlobCount(null); + tracingContext.setOperatedBlobCount(0); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java index d976c6f9b6617..859b9474cfe1a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.utils; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; /** * Interface for testing identifiers tracked via TracingContext @@ -32,4 +33,5 @@ public interface Listener { void setOperation(FSOperationType operation); void updateIngressHandler(String ingressHandler); void updatePosition(String position); + void updateReadType(ReadType readType); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index cd462ebab088e..0179718a06e8c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -25,9 +25,12 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; @@ -64,9 +67,10 @@ public class TracingContext { //final concatenated ID list set into x-ms-client-request-id header private String header = EMPTY_STRING; private String ingressHandler = EMPTY_STRING; - private String position = EMPTY_STRING; + private String position = EMPTY_STRING; // position of read/write in remote file private String metricResults = EMPTY_STRING; private String metricHeader = EMPTY_STRING; + private ReadType readType = ReadType.UNKNOWN_READ; /** * If {@link #primaryRequestId} is null, this field shall be set equal @@ -76,9 +80,8 @@ public class TracingContext { * will not change this field. In case {@link #primaryRequestId} is non-null, * this field shall not be set. */ - private String primaryRequestIdForRetry; - - private Integer operatedBlobCount = null; + private String primaryRequestIdForRetry = EMPTY_STRING; + private Integer operatedBlobCount = 0; // Only relevant for rename-delete over blob endpoint where it will be explicitly set. private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); public static final int MAX_CLIENT_CORRELATION_ID_LENGTH = 72; @@ -142,6 +145,7 @@ public TracingContext(TracingContext originalTracingContext) { this.listener = originalTracingContext.listener.getClone(); } this.metricResults = originalTracingContext.metricResults; + this.readType = originalTracingContext.readType; } public static String validateClientCorrelationID(String clientCorrelationID) { if ((clientCorrelationID.length() > MAX_CLIENT_CORRELATION_ID_LENGTH) @@ -181,8 +185,24 @@ public void setListener(Listener listener) { } /** - * Concatenate all identifiers separated by (:) into a string and set into + * Concatenate all components separated by (:) into a string and set into * X_MS_CLIENT_REQUEST_ID header of the http operation + * Following are the components in order of concatenation: + * * @param httpOperation AbfsHttpOperation instance to set header into * connection * @param previousFailure Failure seen before this API trigger on same operation @@ -193,32 +213,33 @@ public void setListener(Listener listener) { public void constructHeader(AbfsHttpOperation httpOperation, String previousFailure, String retryPolicyAbbreviation) { clientRequestId = UUID.randomUUID().toString(); switch (format) { - case ALL_ID_FORMAT: // Optional IDs (e.g. streamId) may be empty - header = - clientCorrelationID + ":" + clientRequestId + ":" + fileSystemID + ":" - + getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID - + ":" + opType + ":" + retryCount; - header = addFailureReasons(header, previousFailure, retryPolicyAbbreviation); - if (!(ingressHandler.equals(EMPTY_STRING))) { - header += ":" + ingressHandler; - } - if (!(position.equals(EMPTY_STRING))) { - header += ":" + position; - } - if (operatedBlobCount != null) { - header += (":" + operatedBlobCount); - } - header += (":" + httpOperation.getTracingContextSuffix()); - metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; + case ALL_ID_FORMAT: + header = TracingHeaderVersion.getCurrentVersion() + COLON + + clientCorrelationID + COLON + + clientRequestId + COLON + + fileSystemID + COLON + + getPrimaryRequestIdForHeader(retryCount > 0) + COLON + + streamID + COLON + + opType + COLON + + getRetryHeader(previousFailure, retryPolicyAbbreviation) + COLON + + ingressHandler + COLON + + position + COLON + + operatedBlobCount + COLON + + getOperationSpecificHeader(opType) + COLON + + httpOperation.getTracingContextSuffix(); + + metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : EMPTY_STRING; break; case TWO_ID_FORMAT: - header = clientCorrelationID + ":" + clientRequestId; - metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; + header = TracingHeaderVersion.getCurrentVersion() + COLON + + clientCorrelationID + COLON + clientRequestId; + metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : EMPTY_STRING; break; default: //case SINGLE_ID_FORMAT - header = clientRequestId; - metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; + header = TracingHeaderVersion.getCurrentVersion() + COLON + + clientRequestId; + metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : EMPTY_STRING; } if (listener != null) { //for testing listener.callTracingHeaderValidator(header, format); @@ -234,7 +255,7 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail * of the x-ms-client-request-id header in case of retry of the same API-request. */ if (primaryRequestId.isEmpty() && previousFailure == null) { - String[] clientRequestIdParts = clientRequestId.split("-"); + String[] clientRequestIdParts = clientRequestId.split(String.valueOf(CHAR_HYPHEN)); primaryRequestIdForRetry = clientRequestIdParts[ clientRequestIdParts.length - 1]; } @@ -254,15 +275,50 @@ private String getPrimaryRequestIdForHeader(final Boolean isRetry) { return primaryRequestIdForRetry; } - private String addFailureReasons(final String header, - final String previousFailure, String retryPolicyAbbreviation) { + /** + * Get the retry header string in format retryCount_failureReason_retryPolicyAbbreviation + * retryCount is always there and 0 for first request. + * failureReason is null for first request + * retryPolicyAbbreviation is only present when request fails with ConnectionTimeout + * @param previousFailure Previous failure reason, null if not a retried request + * @param retryPolicyAbbreviation Abbreviation of retry policy used to get retry interval + * @return String representing the retry header + */ + private String getRetryHeader(final String previousFailure, String retryPolicyAbbreviation) { + String retryHeader = String.format("%d", retryCount); if (previousFailure == null) { - return header; + return retryHeader; } if (CONNECTION_TIMEOUT_ABBREVIATION.equals(previousFailure) && retryPolicyAbbreviation != null) { - return String.format("%s_%s_%s", header, previousFailure, retryPolicyAbbreviation); + return String.format("%s_%s_%s", retryHeader, previousFailure, retryPolicyAbbreviation); + } + return String.format("%s_%s", retryHeader, previousFailure); + } + + /** + * Get the operation specific header for the current operation type. + * @param opType The operation type for which the header is needed + * @return String representing the operation specific header + */ + private String getOperationSpecificHeader(FSOperationType opType) { + // Similar header can be added for other operations in the future. + switch (opType) { + case READ: + return getReadSpecificHeader(); + default: + return EMPTY_STRING; // no operation specific header } - return String.format("%s_%s", header, previousFailure); + } + + /** + * Get the operation specific header for read operations. + * @return String representing the read specific header + */ + private String getReadSpecificHeader() { + // More information on read can be added to this header in the future. + // As underscore separated values. + String readHeader = String.format("%s", readType.toString()); + return readHeader; } public void setOperatedBlobCount(Integer count) { @@ -322,4 +378,15 @@ public void setPosition(final String position) { listener.updatePosition(position); } } + + /** + * Sets the read type for the current operation. + * @param readType the read type to set, must not be null. + */ + public void setReadType(ReadType readType) { + this.readType = readType; + if (listener != null) { + listener.updateReadType(readType); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java new file mode 100644 index 0000000000000..4e5a2d650f8d6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.utils; + +/** + * Enum representing the version of the tracing header used in Azure Blob File System (ABFS). + * It defines two versions: V0 and V1, with their respective field counts. + * Any changes to the tracing header should introduce a new version so that every + * version has a fixed predefined schema of fields. + */ +public enum TracingHeaderVersion { + + /** + * Version 0 of the tracing header, which has no version prefix and contains 8 permanent and a few optional fields. + * This is the initial version of the tracing header. + */ + V0("", 8), + /** + * Version 1 of the tracing header, which includes a version prefix and has 13 permanent fields. + * This version is used for the current tracing header schema. + * Schema: version:clientCorrelationId:clientRequestId:fileSystemId + * :primaryRequestId:streamId:opType:retryHeader:ingressHandler + * :position:operatedBlobCount:operationSpecificHeader:httpOperationHeader + */ + V1("v1", 13); + + private final String versionString; + private final int fieldCount; + + TracingHeaderVersion(String versionString, int fieldCount) { + this.versionString = versionString; + this.fieldCount = fieldCount; + } + + @Override + public String toString() { + return versionString; + } + + /** + * Returns the latest version of the tracing header. Any changes done to the + * schema of tracing context header should be accompanied by a version bump. + * @return the latest version of the tracing header. + */ + public static TracingHeaderVersion getCurrentVersion() { + return V1; + } + + public int getFieldCount() { + return fieldCount; + } + + public String getVersionString() { + return versionString; + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 207f0a8c7e39e..9be4998cb8217 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -23,6 +23,7 @@ import java.util.Hashtable; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -707,6 +708,12 @@ protected void assertPathDns(Path path) { .contains(expectedDns); } + protected byte[] getRandomBytesArray(int length) { + final byte[] b = new byte[length]; + new Random().nextBytes(b); + return b; + } + /** * Checks a list of futures for exceptions. * diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java index 5310a6044ba6a..7e05754996f42 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java @@ -48,7 +48,10 @@ import org.apache.hadoop.util.Preconditions; import org.opentest4j.TestAbortedException; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_CORRELATIONID; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT; @@ -227,8 +230,8 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString()); tracingContext.constructHeader(abfsHttpOperation, null, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); String header = tracingContext.getHeader(); - String clientRequestIdUsed = header.split(":")[1]; - String[] clientRequestIdUsedParts = clientRequestIdUsed.split("-"); + String clientRequestIdUsed = header.split(COLON, SPLIT_NO_LIMIT)[2]; + String[] clientRequestIdUsedParts = clientRequestIdUsed.split(String.valueOf(CHAR_HYPHEN)); String assertionPrimaryId = clientRequestIdUsedParts[clientRequestIdUsedParts.length - 1]; tracingContext.setRetryCount(1); @@ -239,7 +242,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin tracingContext.constructHeader(abfsHttpOperation, READ_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); header = tracingContext.getHeader(); - String primaryRequestId = header.split(":")[3]; + String primaryRequestId = header.split(COLON, SPLIT_NO_LIMIT)[4]; Assertions.assertThat(primaryRequestId) .describedAs("PrimaryRequestId in a retried request's " @@ -264,7 +267,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString()); tracingContext.constructHeader(abfsHttpOperation, null, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); String header = tracingContext.getHeader(); - String assertionPrimaryId = header.split(":")[3]; + String assertionPrimaryId = header.split(COLON)[3]; tracingContext.setRetryCount(1); tracingContext.setListener(new TracingHeaderValidator( @@ -274,7 +277,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin tracingContext.constructHeader(abfsHttpOperation, READ_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); header = tracingContext.getHeader(); - String primaryRequestId = header.split(":")[3]; + String primaryRequestId = header.split(COLON)[3]; Assertions.assertThat(primaryRequestId) .describedAs("PrimaryRequestId in a retried request's tracingContext " @@ -326,8 +329,8 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin } private void checkHeaderForRetryPolicyAbbreviation(String header, String expectedFailureReason, String expectedRetryPolicyAbbreviation) { - String[] headerContents = header.split(":"); - String previousReqContext = headerContents[6]; + String[] headerContents = header.split(COLON, SPLIT_NO_LIMIT); + String previousReqContext = headerContents[7]; if (expectedFailureReason != null) { Assertions.assertThat(previousReqContext.split("_")[1]).describedAs( diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index b20596e310130..c80f727abe47b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Random; import java.util.regex.Pattern; import org.assertj.core.api.Assertions; @@ -604,17 +603,6 @@ public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) { return client.getTokenProvider(); } - /** - * Test helper method to get random bytes array. - * @param length The length of byte buffer. - * @return byte buffer. - */ - private byte[] getRandomBytesArray(int length) { - final byte[] b = new byte[length]; - new Random().nextBytes(b); - return b; - } - @Override public AzureBlobFileSystem getFileSystem(final Configuration configuration) throws Exception { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java index d6572443bb9fc..cdb40ddd521d5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java @@ -24,7 +24,6 @@ import java.net.URL; import java.util.Arrays; import java.util.List; -import java.util.Random; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -140,17 +139,6 @@ public ITestAbfsRestOperation() throws Exception { super(); } - /** - * Test helper method to get random bytes array. - * @param length The length of byte buffer - * @return byte buffer - */ - private byte[] getRandomBytesArray(int length) { - final byte[] b = new byte[length]; - new Random().nextBytes(b); - return b; - } - @Override public AzureBlobFileSystem getFileSystem(final Configuration configuration) throws Exception { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index de49da5dc51d2..a4b5cd068e941 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -22,14 +22,17 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; +import java.util.List; import java.util.Optional; import java.util.Random; import java.util.concurrent.ExecutionException; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; @@ -42,13 +45,25 @@ import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderVersion; import org.apache.hadoop.fs.impl.OpenFileParameters; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; +import static org.apache.hadoop.fs.azurebfs.constants.ReadType.DIRECT_READ; +import static org.apache.hadoop.fs.azurebfs.constants.ReadType.FOOTER_READ; +import static org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ; +import static org.apache.hadoop.fs.azurebfs.constants.ReadType.NORMAL_READ; +import static org.apache.hadoop.fs.azurebfs.constants.ReadType.PREFETCH_READ; +import static org.apache.hadoop.fs.azurebfs.constants.ReadType.SMALLFILE_READ; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; @@ -85,6 +100,9 @@ public class TestAbfsInputStream extends private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD = REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; + private static final int POSITION_INDEX = 9; + private static final int OPERATION_INDEX = 6; + private static final int READTYPE_INDEX = 11; @Override public void teardown() throws Exception { @@ -781,6 +799,183 @@ public void testDefaultReadaheadQueueDepth() throws Exception { in.close(); } + /** + * Test to verify that the read type and position are correctly set in the + * client request id header for various type of read operations performed. + * @throws Exception if any error occurs during the test + */ + @Test + public void testReadTypeInTracingContextHeader() throws Exception { + AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); + AbfsConfiguration spiedConfig = Mockito.spy(spiedStore.getAbfsConfiguration()); + AbfsClient spiedClient = Mockito.spy(spiedStore.getClient()); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadBufferSize(); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadAheadBlockSize(); + Mockito.doReturn(spiedClient).when(spiedStore).getClient(); + Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); + Mockito.doReturn(spiedConfig).when(spiedStore).getAbfsConfiguration(); + int totalReadCalls = 0; + int fileSize; + + /* + * Test to verify Normal Read Type. + * Disabling read ahead ensures that read type is normal read. + */ + fileSize = 3 * ONE_MB; // To make sure multiple blocks are read. + totalReadCalls += 3; // 3 blocks of 1MB each. + doReturn(false).when(spiedConfig).isReadAheadV2Enabled(); + doReturn(false).when(spiedConfig).isReadAheadEnabled(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, NORMAL_READ, 3, totalReadCalls); + + /* + * Test to verify Missed Cache Read Type. + * Setting read ahead depth to 0 ensure that nothing can be got from prefetch. + * In such a case Input Stream will do a sequential read with missed cache read type. + */ + fileSize = 3 * ONE_MB; // To make sure multiple blocks are read with MR + totalReadCalls += 3; // 3 block of 1MB. + Mockito.doReturn(0).when(spiedConfig).getReadAheadQueueDepth(); + doReturn(true).when(spiedConfig).isReadAheadEnabled(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, MISSEDCACHE_READ, 3, totalReadCalls); + + /* + * Test to verify Prefetch Read Type. + * Setting read ahead depth to 2 with prefetch enabled ensures that prefetch is done. + * First read here might be Normal or Missed Cache but the rest 2 should be Prefetched Read. + */ + fileSize = 3 * ONE_MB; // To make sure multiple blocks are read. + totalReadCalls += 3; + doReturn(true).when(spiedConfig).isReadAheadEnabled(); + Mockito.doReturn(3).when(spiedConfig).getReadAheadQueueDepth(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, PREFETCH_READ, 3, totalReadCalls); + + /* + * Test to verify Footer Read Type. + * Having file size less than footer read size and disabling small file opt + */ + fileSize = 8 * ONE_KB; + totalReadCalls += 1; // Full file will be read along with footer. + doReturn(false).when(spiedConfig).readSmallFilesCompletely(); + doReturn(true).when(spiedConfig).optimizeFooterRead(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, FOOTER_READ, 1, totalReadCalls); + + /* + * Test to verify Small File Read Type. + * Having file size less than block size and disabling footer read opt + */ + totalReadCalls += 1; // Full file will be read along with footer. + doReturn(true).when(spiedConfig).readSmallFilesCompletely(); + doReturn(false).when(spiedConfig).optimizeFooterRead(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, SMALLFILE_READ, 1, totalReadCalls); + + /* + * Test to verify Direct Read Type and a read from random position. + * Separate AbfsInputStream method needs to be called. + */ + fileSize = ONE_MB; + totalReadCalls += 1; + doReturn(false).when(spiedConfig).readSmallFilesCompletely(); + doReturn(true).when(spiedConfig).isBufferedPReadDisabled(); + Path testPath = createTestFile(spiedFs, fileSize); + try (FSDataInputStream iStream = spiedFs.open(testPath)) { + AbfsInputStream stream = (AbfsInputStream) iStream.getWrappedStream(); + int bytesRead = stream.read(ONE_MB/3, new byte[fileSize], 0, + fileSize); + Assertions.assertThat(fileSize - ONE_MB/3) + .describedAs("Read size should match file size") + .isEqualTo(bytesRead); + } + assertReadTypeInClientRequestId(spiedFs, 1, totalReadCalls, DIRECT_READ); + } + + private void testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem fs, + int fileSize, ReadType readType, int numOfReadCalls, int totalReadCalls) throws Exception { + Path testPath = createTestFile(fs, fileSize); + readFile(fs, testPath, fileSize); + assertReadTypeInClientRequestId(fs, numOfReadCalls, totalReadCalls, readType); + } + + private Path createTestFile(AzureBlobFileSystem fs, int fileSize) throws Exception { + Path testPath = new Path("testFile"); + byte[] fileContent = getRandomBytesArray(fileSize); + try (FSDataOutputStream oStream = fs.create(testPath)) { + oStream.write(fileContent); + oStream.flush(); + } + return testPath; + } + + private void readFile(AzureBlobFileSystem fs, Path testPath, int fileSize) throws Exception { + try (FSDataInputStream iStream = fs.open(testPath)) { + int bytesRead = iStream.read(new byte[fileSize], 0, + fileSize); + Assertions.assertThat(fileSize) + .describedAs("Read size should match file size") + .isEqualTo(bytesRead); + } + } + + private void assertReadTypeInClientRequestId(AzureBlobFileSystem fs, int numOfReadCalls, + int totalReadCalls, ReadType readType) throws Exception { + ArgumentCaptor captor1 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captor2 = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor captor3 = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor captor4 = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor captor5 = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor captor6 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captor7 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captor8 = ArgumentCaptor.forClass(ContextEncryptionAdapter.class); + ArgumentCaptor captor9 = ArgumentCaptor.forClass(TracingContext.class); + + verify(fs.getAbfsStore().getClient(), times(totalReadCalls)).read( + captor1.capture(), captor2.capture(), captor3.capture(), + captor4.capture(), captor5.capture(), captor6.capture(), + captor7.capture(), captor8.capture(), captor9.capture()); + List tracingContextList = captor9.getAllValues(); + if (readType == PREFETCH_READ) { + /* + * For Prefetch Enabled, first read can be Normal or Missed Cache Read. + * So we will assert only for last 2 calls which should be Prefetched Read. + * Since calls are asynchronous, we can not guarantee the order of calls. + * Therefore, we cannot assert on exact position here. + */ + for (int i = tracingContextList.size() - (numOfReadCalls - 1); i < tracingContextList.size(); i++) { + verifyHeaderForReadTypeInTracingContextHeader(tracingContextList.get(i), readType, -1); + } + } else if (readType == DIRECT_READ) { + int expectedReadPos = ONE_MB/3; + for (int i = tracingContextList.size() - numOfReadCalls; i < tracingContextList.size(); i++) { + verifyHeaderForReadTypeInTracingContextHeader(tracingContextList.get(i), readType, expectedReadPos); + expectedReadPos += ONE_MB; + } + } else { + int expectedReadPos = 0; + for (int i = tracingContextList.size() - numOfReadCalls; i < tracingContextList.size(); i++) { + verifyHeaderForReadTypeInTracingContextHeader(tracingContextList.get(i), readType, expectedReadPos); + expectedReadPos += ONE_MB; + } + } + } + + private void verifyHeaderForReadTypeInTracingContextHeader(TracingContext tracingContext, ReadType readType, int expectedReadPos) { + AbfsHttpOperation mockOp = Mockito.mock(AbfsHttpOperation.class); + doReturn(EMPTY_STRING).when(mockOp).getTracingContextSuffix(); + tracingContext.constructHeader(mockOp, null, null); + String[] idList = tracingContext.getHeader().split(COLON, SPLIT_NO_LIMIT); + Assertions.assertThat(idList).describedAs("Client Request Id should have all fields").hasSize( + TracingHeaderVersion.getCurrentVersion().getFieldCount()); + if (expectedReadPos > 0) { + Assertions.assertThat(idList[POSITION_INDEX]) + .describedAs("Read Position should match") + .isEqualTo(Integer.toString(expectedReadPos)); + } + Assertions.assertThat(idList[OPERATION_INDEX]).describedAs("Operation Type Should Be Read") + .isEqualTo(FSOperationType.READ.toString()); + Assertions.assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing context header should match") + .isEqualTo(readType.toString()); + } + private void testReadAheads(AbfsInputStream inputStream, int readRequestSize, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheHttpClientFallback.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheHttpClientFallback.java index 159405d86815d..201b712b40f2c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheHttpClientFallback.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestApacheHttpClientFallback.java @@ -61,15 +61,15 @@ private TracingContext getSampleTracingContext(int[] jdkCallsRegister, answer.callRealMethod(); AbfsHttpOperation op = answer.getArgument(0); if (op instanceof AbfsAHCHttpOperation) { - Assertions.assertThat(tc.getHeader()).endsWith(APACHE_IMPL); + Assertions.assertThat(tc.getHeader()).contains(APACHE_IMPL); apacheCallsRegister[0]++; } if (op instanceof AbfsJdkHttpOperation) { jdkCallsRegister[0]++; if (AbfsApacheHttpClient.usable()) { - Assertions.assertThat(tc.getHeader()).endsWith(JDK_IMPL); + Assertions.assertThat(tc.getHeader()).contains(JDK_IMPL); } else { - Assertions.assertThat(tc.getHeader()).endsWith(JDK_FALLBACK); + Assertions.assertThat(tc.getHeader()).contains(JDK_FALLBACK); } } return null; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java index d836eb13b19b6..4bdb3a68ae3f2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java @@ -21,8 +21,10 @@ import org.assertj.core.api.Assertions; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; /** * Used to validate correlation identifiers provided during testing against @@ -40,7 +42,8 @@ public class TracingHeaderValidator implements Listener { private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$"; private String ingressHandler = null; - private String position = null; + private String position = String.valueOf(0); + private ReadType readType = ReadType.UNKNOWN_READ; private Integer operatedBlobCount = null; @@ -59,6 +62,7 @@ public TracingHeaderValidator getClone() { tracingHeaderValidator.primaryRequestId = primaryRequestId; tracingHeaderValidator.ingressHandler = ingressHandler; tracingHeaderValidator.position = position; + tracingHeaderValidator.readType = readType; tracingHeaderValidator.operatedBlobCount = operatedBlobCount; return tracingHeaderValidator; } @@ -81,45 +85,43 @@ public TracingHeaderValidator(String clientCorrelationId, String fileSystemId, } private void validateTracingHeader(String tracingContextHeader) { - String[] idList = tracingContextHeader.split(":"); + String[] idList = tracingContextHeader.split(":", SPLIT_NO_LIMIT); validateBasicFormat(idList); if (format != TracingHeaderFormat.ALL_ID_FORMAT) { return; } - if (idList.length >= 8) { - if (operatedBlobCount != null) { - Assertions.assertThat(Integer.parseInt(idList[7])) - .describedAs("OperatedBlobCount is incorrect") - .isEqualTo(operatedBlobCount); - } + + // Validate Operated Blob Count + if (operatedBlobCount != null) { + Assertions.assertThat(Integer.parseInt(idList[10])) + .describedAs("OperatedBlobCount is incorrect") + .isEqualTo(operatedBlobCount); } - if (!primaryRequestId.isEmpty() && !idList[3].isEmpty()) { - Assertions.assertThat(idList[3]) + + // Validate Primary Request ID + if (!primaryRequestId.isEmpty() && !idList[4].isEmpty()) { + Assertions.assertThat(idList[4]) .describedAs("PrimaryReqID should be common for these requests") .isEqualTo(primaryRequestId); } + + // Validate Stream ID if (!streamID.isEmpty()) { - Assertions.assertThat(idList[4]) + Assertions.assertThat(idList[5]) .describedAs("Stream id should be common for these requests") .isEqualTo(streamID); } } private void validateBasicFormat(String[] idList) { + // Validate Version and Number of fields in the header + Assertions.assertThat(idList[0]).describedAs("Version should be present") + .isEqualTo(TracingHeaderVersion.getCurrentVersion().toString()); + int expectedSize = 0; if (format == TracingHeaderFormat.ALL_ID_FORMAT) { - int expectedSize = 8; - if (operatedBlobCount != null) { - expectedSize += 1; - } - if (ingressHandler != null) { - expectedSize += 2; - } - Assertions.assertThat(idList) - .describedAs("header should have " + expectedSize + " elements") - .hasSize(expectedSize); + expectedSize = TracingHeaderVersion.getCurrentVersion().getFieldCount(); } else if (format == TracingHeaderFormat.TWO_ID_FORMAT) { - Assertions.assertThat(idList) - .describedAs("header should have 2 elements").hasSize(2); + expectedSize = 3; } else { Assertions.assertThat(idList).describedAs("header should have 1 element") .hasSize(1); @@ -127,36 +129,49 @@ private void validateBasicFormat(String[] idList) { .describedAs("Client request ID is a guid").matches(GUID_PATTERN); return; } + Assertions.assertThat(idList) + .describedAs("header should have " + expectedSize + " elements") + .hasSize(expectedSize); + // Validate Client Correlation ID if (clientCorrelationId.matches("[a-zA-Z0-9-]*")) { - Assertions.assertThat(idList[0]) + Assertions.assertThat(idList[1]) .describedAs("Correlation ID should match config") .isEqualTo(clientCorrelationId); } else { - Assertions.assertThat(idList[0]) + Assertions.assertThat(idList[1]) .describedAs("Invalid config should be replaced with empty string") .isEmpty(); } - Assertions.assertThat(idList[1]).describedAs("Client request ID is a guid") + + // Validate Client Request ID + Assertions.assertThat(idList[2]).describedAs("Client request ID is a guid") .matches(GUID_PATTERN); if (format != TracingHeaderFormat.ALL_ID_FORMAT) { return; } - Assertions.assertThat(idList[2]).describedAs("Filesystem ID incorrect") + // Validate FileSystem ID + Assertions.assertThat(idList[3]).describedAs("Filesystem ID incorrect") .isEqualTo(fileSystemId); + + // Validate Primary Request ID if (needsPrimaryRequestId && !operation .equals(FSOperationType.READ)) { - Assertions.assertThat(idList[3]).describedAs("should have primaryReqId") + Assertions.assertThat(idList[4]).describedAs("should have primaryReqId") .isNotEmpty(); } - Assertions.assertThat(idList[5]).describedAs("Operation name incorrect") + + // Validate Operation Type + Assertions.assertThat(idList[6]).describedAs("Operation name incorrect") .isEqualTo(operation.toString()); - if (idList[6].contains("_")) { - idList[6] = idList[6].split("_")[0]; + + // Validate Retry Header + if (idList[7].contains("_")) { + idList[7] = idList[7].split("_")[0]; } - int retryCount = Integer.parseInt(idList[6]); + int retryCount = Integer.parseInt(idList[7]); Assertions.assertThat(retryCount) .describedAs("Retry was required due to issue on server side") .isEqualTo(retryNum); @@ -186,8 +201,13 @@ public void updatePosition(String position) { this.position = position; } + @Override + public void updateReadType(ReadType readType) { + this.readType = readType; + } + /** - * Sets the value of the number of blobs operated on + * Sets the value of the number of blobs operated on. * @param operatedBlobCount number of blobs operated on */ public void setOperatedBlobCount(Integer operatedBlobCount) {