Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ private AuditConstants() {
*/
public static final String PARAM_PROCESS = "ps";

/**
* Header: Range for GET request data: {@value}.
*/
public static final String PARAM_RANGE = "r";

/**
* Task Attempt ID query header: {@value}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +36,7 @@
import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer;
import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
import org.apache.hadoop.security.UserGroupInformation;

Expand Down Expand Up @@ -110,6 +112,14 @@ public class LoggingAuditor
*/
private Collection<String> filters;

/**
* Log for warning of problems getting the range of GetObjectRequest
* will only log of a problem once per process instance.
* This is to avoid logs being flooded with errors.
*/
private static final LogExactlyOnce WARN_INCORRECT_RANGE =
new LogExactlyOnce(LOG);

/**
* Create the auditor.
* The UGI current user is used to provide the principal;
Expand Down Expand Up @@ -230,6 +240,25 @@ private class LoggingAuditSpan extends AbstractAuditSpanImpl {

private final HttpReferrerAuditHeader referrer;

/**
* Attach Range of data for GetObject Request
* @param request given get object request
*/
private void attachRangeFromRequest(AmazonWebServiceRequest request) {
if (request instanceof GetObjectRequest) {
long[] rangeValue = ((GetObjectRequest) request).getRange();
if (rangeValue == null || rangeValue.length == 0) {
return;
}
if (rangeValue.length != 2) {
WARN_INCORRECT_RANGE.warn("Expected range to contain 0 or 2 elements. Got "
+ rangeValue.length + ". Ignoring");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually ignore the value here. We should return or put the next few lines in an else block.

We'd get an IndexOutOfBoundsException when we access rangeValue[1] when there's only one element, for example.

String combinedRangeValue = String.format("bytes=%d-%d", rangeValue[0], rangeValue[1]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make a two byte prefix ("rg"?) and put in AuditConstants?

  • shorter as we don't know the upper limit on referrer strings and would like to keep it down
  • in the file for other connectors to use, and to make it easy to look at what to expect

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveloughran, can you elaborate on the rg prefix? Is it for the query parameter? If so, we already have that as r (PARAM_RANGE).

bytes= is part of the range header value which - following the SDK upgrade - can just be dumped in place, without needing reconstruction like this.

That being said, it does seem a bit of a waste. Maybe we drop it entirely here (and for all connectors) and just include whatever the value of the range header was minus bytes= prefix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i'd missed that. yes, you are right. just include it without the bytes= stuff.

i guess we do have to consider what happens if/when s3 supports multiple ranges, but that would be just something like "1-3,4-7", wouldn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that makes sense.

The only risk I see is if the object store uses a range specified in another unit from bytes. In V1 SDK, it's hard coded to bytes - no issue there. In V2, I believe you specify the header itself so we can warn once if we see something other than bytes= prefix and ignore the header if so. We are the ones specifying the header in the first place anyway, right? (FYI @ahmarsuhail @passaro)

referrer.set(AuditConstants.PARAM_RANGE, combinedRangeValue);
}
}

private final String description;

private LoggingAuditSpan(
Expand Down Expand Up @@ -314,6 +343,8 @@ public void set(final String key, final String value) {
@Override
public <T extends AmazonWebServiceRequest> T beforeExecution(
final T request) {
// attach range for GetObject requests
attachRangeFromRequest(request);
// build the referrer header
final String header = referrer.buildHttpReferrer();
// update the outer class's field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;

import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
Expand Down Expand Up @@ -138,6 +140,17 @@ protected GetObjectMetadataRequest head() {
requestFactory.newGetObjectMetadataRequest("/"));
}

/**
* Create a GetObject request and modify it before passing it through auditor
* @param modifyRequest Consumer Interface for changing the request before passing to the auditor
* @return the request
*/
protected GetObjectRequest get(Consumer<GetObjectRequest> modifyRequest) {
GetObjectRequest req = requestFactory.newGetObjectRequest("/");
modifyRequest.accept(req);
return manager.beforeExecution(req);
}

/**
* Assert a head request fails as there is no
* active span.
Expand Down Expand Up @@ -210,4 +223,15 @@ protected void assertMapContains(final Map<String, String> params,
.isEqualTo(expected);
}

/**
* Assert the map does not contain the key, i.e, it is null.
* @param params map of params
* @param key key
*/
protected void assertMapNotContains(final Map<String, String> params, final String key) {
assertThat(params.get(key))
.describedAs(key)
.isNull();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.regex.Matcher;

import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -46,6 +47,7 @@
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PATH;
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PATH2;
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PRINCIPAL;
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_RANGE;
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD0;
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD1;
import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_TIMESTAMP;
Expand Down Expand Up @@ -115,6 +117,7 @@ public void testHttpReferrerPatchesTheRequest() throws Throwable {
assertThat(span.getTimestamp())
.describedAs("Timestamp of " + span)
.isEqualTo(ts);
assertMapNotContains(params, PARAM_RANGE);

assertMapContains(params, PARAM_TIMESTAMP,
Long.toString(ts));
Expand Down Expand Up @@ -309,6 +312,44 @@ public void testStripWrappedQuotes() throws Throwable {
expectStrippedField("\"\"\"b\"", "b");
}

/**
* Verify that correct range is getting published in header
*/
@Test
public void testGetObjectRange() throws Throwable {
AuditSpan span = span();
GetObjectRequest request = get(getObjectRequest -> getObjectRequest.setRange(100, 200));
Map<String, String> headers
= request.getCustomRequestHeaders();
assertThat(headers)
.describedAs("Custom headers")
.containsKey(HEADER_REFERRER);
String header = headers.get(HEADER_REFERRER);
LOG.info("Header is {}", header);
Map<String, String> params
= HttpReferrerAuditHeader.extractQueryParameters(header);
assertMapContains(params, PARAM_RANGE, "bytes=100-200");
}

/**
* Verify that no range is getting added to the header in request without range
*/
@Test
public void testGetObjectWithoutRange() throws Throwable {
AuditSpan span = span();
GetObjectRequest request = get(getObjectRequest -> {});
Map<String, String> headers
= request.getCustomRequestHeaders();
assertThat(headers)
.describedAs("Custom headers")
.containsKey(HEADER_REFERRER);
String header = headers.get(HEADER_REFERRER);
LOG.info("Header is {}", header);
Map<String, String> params
= HttpReferrerAuditHeader.extractQueryParameters(header);
assertMapNotContains(params, PARAM_RANGE);
}

/**
* Expect a field with quote stripping to match the expected value.
* @param str string to strip
Expand Down