Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private static final String AUDIT_PARAM_FORCE_DELETE = "forceDelete";
private static final String AUDIT_PARAM_START_CONTAINER_ID = "startContainerID";
private static final String AUDIT_PARAM_BLOCK_DATA = "blockData";
private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "blockDataSize";
private static final String AUDIT_PARAM_BLOCK_DATA_OFFSET = "offset";
private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "size";
private static final String AUDIT_PARAM_BLOCK_DATA_STAGE = "stage";
private static final String AUDIT_PARAM_COUNT = "count";
private static final String AUDIT_PARAM_START_LOCAL_ID = "startLocalID";
private static final String AUDIT_PARAM_PREV_CHUNKNAME = "prevChunkName";
Expand All @@ -112,7 +114,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private String clusterId;
private ContainerMetrics metrics;
private final TokenVerifier tokenVerifier;
private long slowOpThresholdMs;
private long slowOpThresholdNs;
private VolumeUsage.MinFreeSpaceCalculator freeSpaceCalculator;

/**
Expand All @@ -134,7 +136,7 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
this.tokenVerifier = tokenVerifier != null ? tokenVerifier
: new NoopTokenVerifier();
this.slowOpThresholdMs = getSlowOpThresholdMs(conf);
this.slowOpThresholdNs = getSlowOpThresholdMs(conf) * 1000000;

protocolMetrics =
new ProtocolMessageMetrics<>(
Expand Down Expand Up @@ -279,7 +281,7 @@ private ContainerCommandResponseProto dispatchRequest(
"ContainerID " + containerID
+ " has been lost and cannot be recreated on this DataNode",
ContainerProtos.Result.CONTAINER_MISSING);
audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}

Expand All @@ -306,7 +308,7 @@ private ContainerCommandResponseProto dispatchRequest(
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " creation failed",
responseProto.getResult());
audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
Expand All @@ -325,13 +327,13 @@ private ContainerCommandResponseProto dispatchRequest(
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
containerType = getContainerType(container);
} else {
if (!msg.hasCreateContainer()) {
audit(action, eventType, msg, AuditEventStatus.FAILURE,
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception("MALFORMED_REQUEST"));
return malformedRequest(msg);
}
Expand All @@ -348,10 +350,10 @@ private ContainerCommandResponseProto dispatchRequest(
"ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
// log failure
audit(action, eventType, msg, AuditEventStatus.FAILURE, ex);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, ex);
return ContainerUtils.logAndReturnError(LOG, ex, msg);
}
perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
perf.appendPreOpLatencyNano(Time.monotonicNowNanos() - startTime);
responseProto = handler.handle(msg, container, dispatcherContext);
long opLatencyNs = Time.monotonicNowNanos() - startTime;
if (responseProto != null) {
Expand Down Expand Up @@ -417,24 +419,24 @@ private ContainerCommandResponseProto dispatchRequest(
}
if (result == Result.SUCCESS) {
updateBCSID(container, dispatcherContext, cmdType);
audit(action, eventType, msg, AuditEventStatus.SUCCESS, null);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.SUCCESS, null);
} else {
//TODO HDDS-7096:
// This is a too general place for on demand scanning.
// Create a specific exception that signals for on demand scanning
// and move this general scan to where it is more appropriate.
// Add integration tests to test the full functionality.
OnDemandContainerDataScanner.scanContainer(container);
audit(action, eventType, msg, AuditEventStatus.FAILURE,
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
perf.appendOpLatencyMs(opLatencyNs);
performanceAudit(action, msg, perf, opLatencyNs);
perf.appendOpLatencyNanos(opLatencyNs);
performanceAudit(action, msg, dispatcherContext, perf, opLatencyNs);

return responseProto;
} else {
// log failure
audit(action, eventType, msg, AuditEventStatus.FAILURE,
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception("UNSUPPORTED_REQUEST"));
return unsupportedRequest(msg);
}
Expand Down Expand Up @@ -547,7 +549,7 @@ public void validateContainerCommand(
StorageContainerException ex = new StorageContainerException(
"Invalid ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
audit(action, eventType, msg, AuditEventStatus.FAILURE, ex);
audit(action, eventType, msg, null, AuditEventStatus.FAILURE, ex);
throw ex;
}

Expand All @@ -567,12 +569,12 @@ public void validateContainerCommand(
// if the container is not open/recovering, no updates can happen. Just
// throw an exception
ContainerNotOpenException cex = new ContainerNotOpenException(log);
audit(action, eventType, msg, AuditEventStatus.FAILURE, cex);
audit(action, eventType, msg, null, AuditEventStatus.FAILURE, cex);
throw cex;
}
} else if (HddsUtils.isReadOnly(msg) && containerState == State.INVALID) {
InvalidContainerStateException iex = new InvalidContainerStateException(log);
audit(action, eventType, msg, AuditEventStatus.FAILURE, iex);
audit(action, eventType, msg, null, AuditEventStatus.FAILURE, iex);
throw iex;
}
}
Expand Down Expand Up @@ -678,14 +680,14 @@ private EventType getEventType(ContainerCommandRequestProto msg) {
}

private void audit(AuditAction action, EventType eventType,
ContainerCommandRequestProto msg, AuditEventStatus result,
Throwable exception) {
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext,
AuditEventStatus result, Throwable exception) {
Map<String, String> params;
AuditMessage amsg;
switch (result) {
case SUCCESS:
if (isAllowed(action.getAction())) {
params = getAuditParams(msg);
params = getAuditParams(msg, dispatcherContext);
if (eventType == EventType.READ &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
Expand All @@ -699,7 +701,7 @@ private void audit(AuditAction action, EventType eventType,
break;

case FAILURE:
params = getAuditParams(msg);
params = getAuditParams(msg, dispatcherContext);
if (eventType == EventType.READ &&
AUDIT.getLogger().isErrorEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForFailure(action, params, exception);
Expand All @@ -719,9 +721,9 @@ private void audit(AuditAction action, EventType eventType,
}

private void performanceAudit(AuditAction action, ContainerCommandRequestProto msg,
PerformanceStringBuilder performance, long opLatencyMs) {
if (isOperationSlow(opLatencyMs)) {
Map<String, String> params = getAuditParams(msg);
DispatcherContext dispatcherContext, PerformanceStringBuilder performance, long opLatencyNs) {
if (isOperationSlow(opLatencyNs)) {
Map<String, String> params = getAuditParams(msg, dispatcherContext);
AuditMessage auditMessage =
buildAuditMessageForPerformance(action, params, performance);
AUDIT.logPerformance(auditMessage);
Expand Down Expand Up @@ -837,7 +839,7 @@ private static DNAction getAuditAction(Type cmdType) {
}

private static Map<String, String> getAuditParams(
ContainerCommandRequestProto msg) {
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
Map<String, String> auditParams = new TreeMap<>();
Type cmdType = msg.getCmdType();
String containerID = String.valueOf(msg.getContainerID());
Expand Down Expand Up @@ -904,6 +906,8 @@ private static Map<String, String> getAuditParams(
case ReadChunk:
auditParams.put(AUDIT_PARAM_BLOCK_DATA,
BlockID.getFromProtobuf(msg.getReadChunk().getBlockID()).toString());
auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
String.valueOf(msg.getReadChunk().getChunkData().getOffset()));
auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
String.valueOf(msg.getReadChunk().getChunkData().getLen()));
return auditParams;
Expand All @@ -918,8 +922,13 @@ private static Map<String, String> getAuditParams(
auditParams.put(AUDIT_PARAM_BLOCK_DATA,
BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID())
.toString());
auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
String.valueOf(msg.getWriteChunk().getChunkData().getOffset()));
auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
String.valueOf(msg.getWriteChunk().getChunkData().getLen()));
if (dispatcherContext != null && dispatcherContext.getStage() != null) {
auditParams.put(AUDIT_PARAM_BLOCK_DATA_STAGE, dispatcherContext.getStage().toString());
}
return auditParams;

case ListChunk:
Expand All @@ -936,6 +945,8 @@ private static Map<String, String> getAuditParams(
auditParams.put(AUDIT_PARAM_BLOCK_DATA,
BlockData.getFromProtoBuf(msg.getPutSmallFile()
.getBlock().getBlockData()).toString());
auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
String.valueOf(msg.getPutSmallFile().getChunkInfo().getOffset()));
auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
String.valueOf(msg.getPutSmallFile().getChunkInfo().getLen()));
} catch (IOException ex) {
Expand Down Expand Up @@ -975,7 +986,7 @@ private static Map<String, String> getAuditParams(

}

private boolean isOperationSlow(long opLatencyMs) {
return opLatencyMs >= slowOpThresholdMs;
private boolean isOperationSlow(long opLatencyNs) {
return opLatencyNs >= slowOpThresholdNs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,20 @@ public void appendOpLatencyNanos(long nanos) {

/**
* Appends pre-operation operation latency in milliseconds.
* @param millis Latency in nanoseconds.
* @param millis Latency in milliseconds.
*/
public void appendPreOpLatencyMs(long millis) {
append("preOpLatencyMs", millis);
}

/**
* Appends pre-operation operation latency in milliseconds.
* @param nanos Latency in nanoseconds.
*/
public void appendPreOpLatencyNano(long nanos) {
append("preOpLatencyMs", TimeUnit.NANOSECONDS.toMillis(nanos));
}

/**
* Appends whole operation latency in milliseconds.
* @param millis Latency in milliseconds.
Expand Down