-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19365. AAL support for auditing. #7723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ | |
| import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; | ||
| import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO; | ||
| import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan; | ||
| import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan; | ||
| import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN; | ||
| import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED; | ||
| import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT; | ||
|
|
@@ -69,6 +70,8 @@ | |
| import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID; | ||
| import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER; | ||
| import static org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName; | ||
| import static software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME; | ||
steveloughran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| import static software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID; | ||
|
|
||
| /** | ||
| * The LoggingAuditor logs operations at DEBUG (in SDK Request) and | ||
|
|
@@ -85,7 +88,6 @@ public class LoggingAuditor | |
| private static final Logger LOG = | ||
| LoggerFactory.getLogger(LoggingAuditor.class); | ||
|
|
||
|
|
||
| /** | ||
| * Some basic analysis for the logs. | ||
| */ | ||
|
|
@@ -267,8 +269,9 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) { | |
| */ | ||
| private class LoggingAuditSpan extends AbstractAuditSpanImpl { | ||
|
|
||
| private final HttpReferrerAuditHeader referrer; | ||
| private HttpReferrerAuditHeader referrer; | ||
|
|
||
| private final HttpReferrerAuditHeader.Builder headerBuilder; | ||
steveloughran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /** | ||
| * Attach Range of data for GetObject Request. | ||
| * @param request the sdk request to be modified | ||
|
|
@@ -300,7 +303,7 @@ private LoggingAuditSpan( | |
| final String path2) { | ||
| super(spanId, operationName); | ||
|
|
||
| this.referrer = HttpReferrerAuditHeader.builder() | ||
| this.headerBuilder = HttpReferrerAuditHeader.builder() | ||
| .withContextId(getAuditorId()) | ||
| .withSpanId(spanId) | ||
| .withOperationName(operationName) | ||
|
|
@@ -312,8 +315,9 @@ private LoggingAuditSpan( | |
| currentThreadID()) | ||
| .withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp())) | ||
| .withEvaluated(context.getEvaluatedEntries()) | ||
| .withFilter(filters) | ||
| .build(); | ||
| .withFilter(filters); | ||
|
|
||
| this.referrer = this.headerBuilder.build(); | ||
|
|
||
| this.description = referrer.buildHttpReferrer(); | ||
| } | ||
|
|
@@ -384,12 +388,33 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, | |
| SdkHttpRequest httpRequest = context.httpRequest(); | ||
| SdkRequest sdkRequest = context.request(); | ||
|
|
||
| // If spanId and operationName are set in execution attributes, then use these values, | ||
| // instead of the ones in the current span. This is useful when requests are happening in dependencies such as | ||
| // the analytics accelerator library (AAL), where they cannot be attached to the correct span. In which case, AAL | ||
| // will attach the current spanId and operationName via execution attributes during it's request creation. These | ||
| // can then used to update the values in the logger and referrer header. Without this overwriting, the operation | ||
| // name and corresponding span will be whichever is active on the thread the request is getting executed on. | ||
| boolean isRequestAuditedOutsideCurrentSpan = isRequestAuditedOutsideOfCurrentSpan(executionAttributes); | ||
|
|
||
| String spanId = isRequestAuditedOutsideCurrentSpan ? | ||
| executionAttributes.getAttribute(SPAN_ID) : getSpanId(); | ||
|
|
||
| String operationName = isRequestAuditedOutsideCurrentSpan ? | ||
| executionAttributes.getAttribute(OPERATION_NAME) : getOperationName(); | ||
|
|
||
| if (isRequestAuditedOutsideCurrentSpan) { | ||
| this.headerBuilder.withSpanId(spanId); | ||
| this.headerBuilder.withOperationName(operationName); | ||
| this.referrer = this.headerBuilder.build(); | ||
| } | ||
|
|
||
| // attach range for GetObject requests | ||
| attachRangeFromRequest(httpRequest, executionAttributes); | ||
|
|
||
| // for delete op, attach the number of files to delete | ||
| attachDeleteKeySizeAttribute(sdkRequest); | ||
|
|
||
|
|
||
|
||
| // build the referrer header | ||
| final String header = referrer.buildHttpReferrer(); | ||
| // update the outer class's field. | ||
|
|
@@ -400,11 +425,12 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, | |
| .appendHeader(HEADER_REFERRER, header) | ||
| .build(); | ||
| } | ||
|
|
||
| if (LOG.isDebugEnabled()) { | ||
| LOG.debug("[{}] {} Executing {} with {}; {}", | ||
| currentThreadID(), | ||
| getSpanId(), | ||
| getOperationName(), | ||
| spanId, | ||
| operationName, | ||
| analyzer.analyze(context.request()), | ||
| header); | ||
| } | ||
|
|
@@ -533,10 +559,12 @@ public void beforeExecution(Context.BeforeExecution context, | |
| + analyzer.analyze(context.request()); | ||
| final String unaudited = getSpanId() + " " | ||
| + UNAUDITED_OPERATION + " " + error; | ||
| // If request is attached to a span in the modifyHttpRequest, as is the case for requests made by AAL, treat it | ||
| // as an audited request. | ||
| if (isRequestNotAlwaysInSpan(context.request())) { | ||
| // can get by auditing during a copy, so don't overreact | ||
| // can get by auditing during a copy, so don't overreact. | ||
| LOG.debug(unaudited); | ||
| } else { | ||
| } else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) { | ||
| final RuntimeException ex = new AuditFailureException(unaudited); | ||
| LOG.debug(unaudited, ex); | ||
| if (isRejectOutOfSpan()) { | ||
|
|
@@ -547,5 +575,4 @@ public void beforeExecution(Context.BeforeExecution context, | |
| super.beforeExecution(context, executionAttributes); | ||
| } | ||
| } | ||
|
|
||
| } | ||
steveloughran marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; | ||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; | ||
| import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; | ||
| import static org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION; | ||
| import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; | ||
| import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; | ||
| import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; | ||
|
|
@@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable { | |
| verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); | ||
| fs.close(); | ||
| verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1); | ||
|
|
||
| // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because the read policy is WHOLE_FILE, | ||
| // in which case, AAL will start prefetching till EoF on file open in 8MB chunks. The file read here | ||
| // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of ~21MB, resulting in 3 GETS: | ||
| // [0-8388607, 8388608-16777215, 16777216-21511173]. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. slick. These are parallel GETs aren't they?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup, parallel GETs |
||
| verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); | ||
| } | ||
|
|
||
| @Test | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.