Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ public final class AbfsHttpConstants {
public static final String COPY_PROGRESS = "CopyProgress";
public static final String COPY_COMPLETION_TIME = "CopyCompletionTime";
public static final String COPY_STATUS_DESCRIPTION = "CopyStatusDescription";
public static final String BLOB_ERROR_CODE_START_XML = "<Code>";
public static final String BLOB_ERROR_CODE_END_XML = "</Code>";
public static final String BLOB_ERROR_MESSAGE_START_XML = "<Message>";
public static final String BLOB_ERROR_MESSAGE_END_XML = "</Message>";

private AbfsHttpConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -37,6 +36,7 @@
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
Expand All @@ -60,9 +60,14 @@

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_ERROR_CODE_END_XML;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_ERROR_CODE_START_XML;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMITTED_BLOCKS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EQUAL;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_ERROR_MESSAGE_END_XML;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_ERROR_MESSAGE_START_XML;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.WASB_DNS_PREFIX;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMP_LIST;

Expand All @@ -82,7 +87,6 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {

private static final int ONE_THOUSAND = 1000;
private static final int ONE_MILLION = ONE_THOUSAND * ONE_THOUSAND;

private final String method;
private final URL url;
private String maskedUrl;
Expand Down Expand Up @@ -466,7 +470,7 @@ public void processResponse(final byte[] buffer, final int offset, final int len
}

if (statusCode >= HttpURLConnection.HTTP_BAD_REQUEST) {
processStorageErrorResponse();
processServerErrorResponse();
if (this.isTraceEnabled) {
this.recvResponseTimeMs += elapsedTimeMs(startTime);
}
Expand Down Expand Up @@ -527,6 +531,15 @@ public void processResponse(final byte[] buffer, final int offset, final int len
}
}

@VisibleForTesting
void processServerErrorResponse() throws IOException {
if (getBaseUrl().contains(WASB_DNS_PREFIX)) {
processBlobStorageErrorResponse();
} else {
processDfsStorageErrorResponse();
}
}

/**
* Parse the stream from the response and set {@link #blobList} field of this
* class.
Expand Down Expand Up @@ -638,8 +651,8 @@ private HttpURLConnection openConnection() throws IOException {
* }
*
*/
private void processStorageErrorResponse() {
try (InputStream stream = connection.getErrorStream()) {
private void processDfsStorageErrorResponse() {
try (InputStream stream = getConnectionErrorStream()) {
if (stream == null) {
return;
}
Expand Down Expand Up @@ -680,6 +693,48 @@ private void processStorageErrorResponse() {
}
}

/**
* Extract errorCode and errorMessage from errorStream populated by server.
* Error-message in the form of:
* <pre>
* {@code
* <?xml version="1.0" encoding="utf-8"?>
* <Error>
* <Code>string-value</Code>
* <Message>string-value</Message>
* </Error>
* }
* </pre>
* <a href= "https://learn.microsoft.com/en-us/rest/api/storageservices/status-and-error-codes2">
* Reference</a>
*/
private void processBlobStorageErrorResponse() throws IOException {
InputStream errorStream = getConnectionErrorStream();
if (errorStream == null) {
return;
}
final String data = IOUtils.toString(errorStream, StandardCharsets.UTF_8);

int codeStartFirstInstance = data.indexOf(BLOB_ERROR_CODE_START_XML);
int codeEndFirstInstance = data.indexOf(BLOB_ERROR_CODE_END_XML);
if (codeEndFirstInstance != -1 && codeStartFirstInstance != -1) {
storageErrorCode = data.substring(codeStartFirstInstance,
codeEndFirstInstance).replace(BLOB_ERROR_CODE_START_XML, "");
}

int msgStartFirstInstance = data.indexOf(BLOB_ERROR_MESSAGE_START_XML);
int msgEndFirstInstance = data.indexOf(BLOB_ERROR_MESSAGE_END_XML);
if (msgEndFirstInstance != -1 && msgStartFirstInstance != -1) {
storageErrorMessage = data.substring(msgStartFirstInstance,
msgEndFirstInstance).replace(BLOB_ERROR_MESSAGE_START_XML, "");
}
}

@VisibleForTesting
InputStream getConnectionErrorStream() {
return connection.getErrorStream();
}

/**
* Returns the elapsed time in milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.fs.azurebfs.utils.UriUtils;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;

public class TestAbfsHttpOperation {

@Test
Expand Down Expand Up @@ -96,6 +103,77 @@ public void testUrlWithNullValues()
"http://www.testurl.net?abc=xyz&pqr=&mnop=");
}

@Test
public void testParseStorageErrorStreamBlob() throws Exception {
AbfsHttpOperation op = Mockito.spy(new AbfsHttpOperation(
new URL("https://account.blob.core.windows.net/container/path"),
HTTP_METHOD_PUT, new ArrayList<>()));
String xmlString = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" +
"<Error>\n" +
" <Code>string-value-code</Code>\n" +
" <Message>string-value-message</Message>\n" +
"</Error>";
ByteArrayInputStream inputStream = new ByteArrayInputStream(
xmlString.getBytes(
StandardCharsets.UTF_8));
Mockito.doReturn(inputStream).when(op).getConnectionErrorStream();
op.processServerErrorResponse();
Assertions.assertThat(op.getStorageErrorCode())
.isEqualTo("string-value-code");
Assertions.assertThat(op.getStorageErrorMessage())
.isEqualTo("string-value-message");

op = Mockito.spy(new AbfsHttpOperation(
new URL("https://account.blob.core.windows.net/container/path"),
HTTP_METHOD_PUT, new ArrayList<>()));
xmlString = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" +
"<Error>\n" +
" <Cod>string-value-code</Cod>\n" +
" <Message>string-value-message</Message>\n" +
"</Error>";
inputStream = new ByteArrayInputStream(xmlString.getBytes(
StandardCharsets.UTF_8));
Mockito.doReturn(inputStream).when(op).getConnectionErrorStream();
op.processServerErrorResponse();
Assertions.assertThat(op.getStorageErrorCode()).isEmpty();
Assertions.assertThat(op.getStorageErrorMessage())
.isEqualTo("string-value-message");

op = Mockito.spy(new AbfsHttpOperation(
new URL("https://account.blob.core.windows.net/container/path"),
HTTP_METHOD_PUT, new ArrayList<>()));
xmlString = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" +
"<Error>\n" +
" <Code>string-value-code</Code>\n" +
" <Messages>string-value-message</Messages>\n" +
"</Error>";
inputStream = new ByteArrayInputStream(xmlString.getBytes(
StandardCharsets.UTF_8));
Mockito.doReturn(inputStream).when(op).getConnectionErrorStream();
op.processServerErrorResponse();
Assertions.assertThat(op.getStorageErrorCode())
.isEqualTo("string-value-code");
Assertions.assertThat(op.getStorageErrorMessage()).isEmpty();
}

@Test
public void testParseStorageErrorStreamDfs() throws Exception {
AbfsHttpOperation op = Mockito.spy(new AbfsHttpOperation(
new URL("https://account.dfs.core.windows.net/container/path"),
HTTP_METHOD_PUT, new ArrayList<>()));
String xmlString
= "{\"error\":{\"code\":\"errorCode\", \"message\":\"errorMessage\"}}";
ByteArrayInputStream inputStream = new ByteArrayInputStream(
xmlString.getBytes(
StandardCharsets.UTF_8));
Mockito.doReturn(inputStream).when(op).getConnectionErrorStream();
op.processServerErrorResponse();
Assertions.assertThat(op.getStorageErrorCode())
.isEqualTo("errorCode");
Assertions.assertThat(op.getStorageErrorMessage())
.isEqualTo("errorMessage");
}

private void testIfMaskAndEncodeSuccessful(final String scenario,
final String url, final String expectedMaskedUrl)
throws UnsupportedEncodingException, MalformedURLException {
Expand Down