diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index cac4df73cc71..c5855b38b74e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -96,7 +96,9 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor { private static final String AUDIT_PARAM_FORCE_DELETE = "forceDelete"; private static final String AUDIT_PARAM_START_CONTAINER_ID = "startContainerID"; private static final String AUDIT_PARAM_BLOCK_DATA = "blockData"; - private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "blockDataSize"; + private static final String AUDIT_PARAM_BLOCK_DATA_OFFSET = "offset"; + private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "size"; + private static final String AUDIT_PARAM_BLOCK_DATA_STAGE = "stage"; private static final String AUDIT_PARAM_COUNT = "count"; private static final String AUDIT_PARAM_START_LOCAL_ID = "startLocalID"; private static final String AUDIT_PARAM_PREV_CHUNKNAME = "prevChunkName"; @@ -112,7 +114,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor { private String clusterId; private ContainerMetrics metrics; private final TokenVerifier tokenVerifier; - private long slowOpThresholdMs; + private long slowOpThresholdNs; private VolumeUsage.MinFreeSpaceCalculator freeSpaceCalculator; /** @@ -134,7 +136,7 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet, HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT); this.tokenVerifier = tokenVerifier != null ? tokenVerifier : new NoopTokenVerifier(); - this.slowOpThresholdMs = getSlowOpThresholdMs(conf); + this.slowOpThresholdNs = getSlowOpThresholdMs(conf) * 1000000; protocolMetrics = new ProtocolMessageMetrics<>( @@ -279,7 +281,7 @@ private ContainerCommandResponseProto dispatchRequest( "ContainerID " + containerID + " has been lost and cannot be recreated on this DataNode", ContainerProtos.Result.CONTAINER_MISSING); - audit(action, eventType, msg, AuditEventStatus.FAILURE, sce); + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce); return ContainerUtils.logAndReturnError(LOG, sce, msg); } @@ -306,7 +308,7 @@ private ContainerCommandResponseProto dispatchRequest( StorageContainerException sce = new StorageContainerException( "ContainerID " + containerID + " creation failed", responseProto.getResult()); - audit(action, eventType, msg, AuditEventStatus.FAILURE, sce); + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce); return ContainerUtils.logAndReturnError(LOG, sce, msg); } Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null @@ -325,13 +327,13 @@ private ContainerCommandResponseProto dispatchRequest( StorageContainerException sce = new StorageContainerException( "ContainerID " + containerID + " does not exist", ContainerProtos.Result.CONTAINER_NOT_FOUND); - audit(action, eventType, msg, AuditEventStatus.FAILURE, sce); + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce); return ContainerUtils.logAndReturnError(LOG, sce, msg); } containerType = getContainerType(container); } else { if (!msg.hasCreateContainer()) { - audit(action, eventType, msg, AuditEventStatus.FAILURE, + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, new Exception("MALFORMED_REQUEST")); return malformedRequest(msg); } @@ -348,10 +350,10 @@ private ContainerCommandResponseProto dispatchRequest( "ContainerType " + containerType, ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); // log failure - audit(action, eventType, msg, AuditEventStatus.FAILURE, ex); + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, ex); return ContainerUtils.logAndReturnError(LOG, ex, msg); } - perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime); + perf.appendPreOpLatencyNano(Time.monotonicNowNanos() - startTime); responseProto = handler.handle(msg, container, dispatcherContext); long opLatencyNs = Time.monotonicNowNanos() - startTime; if (responseProto != null) { @@ -417,7 +419,7 @@ private ContainerCommandResponseProto dispatchRequest( } if (result == Result.SUCCESS) { updateBCSID(container, dispatcherContext, cmdType); - audit(action, eventType, msg, AuditEventStatus.SUCCESS, null); + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.SUCCESS, null); } else { //TODO HDDS-7096: // This is a too general place for on demand scanning. @@ -425,16 +427,16 @@ private ContainerCommandResponseProto dispatchRequest( // and move this general scan to where it is more appropriate. // Add integration tests to test the full functionality. OnDemandContainerDataScanner.scanContainer(container); - audit(action, eventType, msg, AuditEventStatus.FAILURE, + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, new Exception(responseProto.getMessage())); } - perf.appendOpLatencyMs(opLatencyNs); - performanceAudit(action, msg, perf, opLatencyNs); + perf.appendOpLatencyNanos(opLatencyNs); + performanceAudit(action, msg, dispatcherContext, perf, opLatencyNs); return responseProto; } else { // log failure - audit(action, eventType, msg, AuditEventStatus.FAILURE, + audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, new Exception("UNSUPPORTED_REQUEST")); return unsupportedRequest(msg); } @@ -547,7 +549,7 @@ public void validateContainerCommand( StorageContainerException ex = new StorageContainerException( "Invalid ContainerType " + containerType, ContainerProtos.Result.CONTAINER_INTERNAL_ERROR); - audit(action, eventType, msg, AuditEventStatus.FAILURE, ex); + audit(action, eventType, msg, null, AuditEventStatus.FAILURE, ex); throw ex; } @@ -567,12 +569,12 @@ public void validateContainerCommand( // if the container is not open/recovering, no updates can happen. Just // throw an exception ContainerNotOpenException cex = new ContainerNotOpenException(log); - audit(action, eventType, msg, AuditEventStatus.FAILURE, cex); + audit(action, eventType, msg, null, AuditEventStatus.FAILURE, cex); throw cex; } } else if (HddsUtils.isReadOnly(msg) && containerState == State.INVALID) { InvalidContainerStateException iex = new InvalidContainerStateException(log); - audit(action, eventType, msg, AuditEventStatus.FAILURE, iex); + audit(action, eventType, msg, null, AuditEventStatus.FAILURE, iex); throw iex; } } @@ -678,14 +680,14 @@ private EventType getEventType(ContainerCommandRequestProto msg) { } private void audit(AuditAction action, EventType eventType, - ContainerCommandRequestProto msg, AuditEventStatus result, - Throwable exception) { + ContainerCommandRequestProto msg, DispatcherContext dispatcherContext, + AuditEventStatus result, Throwable exception) { Map params; AuditMessage amsg; switch (result) { case SUCCESS: if (isAllowed(action.getAction())) { - params = getAuditParams(msg); + params = getAuditParams(msg, dispatcherContext); if (eventType == EventType.READ && AUDIT.getLogger().isInfoEnabled(AuditMarker.READ.getMarker())) { amsg = buildAuditMessageForSuccess(action, params); @@ -699,7 +701,7 @@ private void audit(AuditAction action, EventType eventType, break; case FAILURE: - params = getAuditParams(msg); + params = getAuditParams(msg, dispatcherContext); if (eventType == EventType.READ && AUDIT.getLogger().isErrorEnabled(AuditMarker.READ.getMarker())) { amsg = buildAuditMessageForFailure(action, params, exception); @@ -719,9 +721,9 @@ private void audit(AuditAction action, EventType eventType, } private void performanceAudit(AuditAction action, ContainerCommandRequestProto msg, - PerformanceStringBuilder performance, long opLatencyMs) { - if (isOperationSlow(opLatencyMs)) { - Map params = getAuditParams(msg); + DispatcherContext dispatcherContext, PerformanceStringBuilder performance, long opLatencyNs) { + if (isOperationSlow(opLatencyNs)) { + Map params = getAuditParams(msg, dispatcherContext); AuditMessage auditMessage = buildAuditMessageForPerformance(action, params, performance); AUDIT.logPerformance(auditMessage); @@ -837,7 +839,7 @@ private static DNAction getAuditAction(Type cmdType) { } private static Map getAuditParams( - ContainerCommandRequestProto msg) { + ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) { Map auditParams = new TreeMap<>(); Type cmdType = msg.getCmdType(); String containerID = String.valueOf(msg.getContainerID()); @@ -904,6 +906,8 @@ private static Map getAuditParams( case ReadChunk: auditParams.put(AUDIT_PARAM_BLOCK_DATA, BlockID.getFromProtobuf(msg.getReadChunk().getBlockID()).toString()); + auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET, + String.valueOf(msg.getReadChunk().getChunkData().getOffset())); auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE, String.valueOf(msg.getReadChunk().getChunkData().getLen())); return auditParams; @@ -918,8 +922,13 @@ private static Map getAuditParams( auditParams.put(AUDIT_PARAM_BLOCK_DATA, BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID()) .toString()); + auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET, + String.valueOf(msg.getWriteChunk().getChunkData().getOffset())); auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE, String.valueOf(msg.getWriteChunk().getChunkData().getLen())); + if (dispatcherContext != null && dispatcherContext.getStage() != null) { + auditParams.put(AUDIT_PARAM_BLOCK_DATA_STAGE, dispatcherContext.getStage().toString()); + } return auditParams; case ListChunk: @@ -936,6 +945,8 @@ private static Map getAuditParams( auditParams.put(AUDIT_PARAM_BLOCK_DATA, BlockData.getFromProtoBuf(msg.getPutSmallFile() .getBlock().getBlockData()).toString()); + auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET, + String.valueOf(msg.getPutSmallFile().getChunkInfo().getOffset())); auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE, String.valueOf(msg.getPutSmallFile().getChunkInfo().getLen())); } catch (IOException ex) { @@ -975,7 +986,7 @@ private static Map getAuditParams( } - private boolean isOperationSlow(long opLatencyMs) { - return opLatencyMs >= slowOpThresholdMs; + private boolean isOperationSlow(long opLatencyNs) { + return opLatencyNs >= slowOpThresholdNs; } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java index 042887e4e533..5e8996df1727 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/ozone/audit/AuditLogger.java @@ -168,12 +168,20 @@ public void appendOpLatencyNanos(long nanos) { /** * Appends pre-operation operation latency in milliseconds. - * @param millis Latency in nanoseconds. + * @param millis Latency in milliseconds. */ public void appendPreOpLatencyMs(long millis) { append("preOpLatencyMs", millis); } + /** + * Appends pre-operation operation latency in milliseconds. + * @param nanos Latency in nanoseconds. + */ + public void appendPreOpLatencyNano(long nanos) { + append("preOpLatencyMs", TimeUnit.NANOSECONDS.toMillis(nanos)); + } + /** * Appends whole operation latency in milliseconds. * @param millis Latency in milliseconds.