Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -392,4 +392,9 @@ private HddsConfigKeys() {

public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_DNAUDIT =
"ozone.audit.log.debug.cmd.list.dnaudit";

public static final String HDDS_DATANODE_SLOW_OP_WARNING_THRESHOLD_KEY =
"hdds.datanode.slow.op.warning.threshold";
public static final String HDDS_DATANODE_SLOW_OP_WARNING_THRESHOLD_DEFAULT =
"500ms";
}
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3418,6 +3418,14 @@
Timeout for the request submitted directly to Ratis in datanode.
</description>
</property>
<property>
<name>hdds.datanode.slow.op.warning.threshold</name>
<tag>OZONE, DATANODE, PERFORMANCE</tag>
<value>500ms</value>
<description>
Thresholds for printing slow-operation audit logs.
</description>
</property>
<property>
<name>ozone.om.keyname.character.check.enabled</name>
<tag>OZONE, OM</tag>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
Expand All @@ -101,6 +103,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private String clusterId;
private ContainerMetrics metrics;
private final TokenVerifier tokenVerifier;
private long slowOpThresholdMs;

/**
* Constructs an OzoneContainer that receives calls from
Expand All @@ -121,6 +124,7 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
this.tokenVerifier = tokenVerifier != null ? tokenVerifier
: new NoopTokenVerifier();
this.slowOpThresholdMs = getSlowOpThresholdMs(conf);

protocolMetrics =
new ProtocolMessageMetrics<>(
Expand Down Expand Up @@ -196,6 +200,7 @@ private ContainerCommandResponseProto dispatchRequest(
AuditAction action = getAuditAction(msg.getCmdType());
EventType eventType = getEventType(msg);
Map<String, String> params = getAuditParams(msg);
PerformanceStringBuilder perf = new PerformanceStringBuilder();

ContainerType containerType;
ContainerCommandResponseProto responseProto = null;
Expand Down Expand Up @@ -326,10 +331,11 @@ private ContainerCommandResponseProto dispatchRequest(
audit(action, eventType, params, AuditEventStatus.FAILURE, ex);
return ContainerUtils.logAndReturnError(LOG, ex, msg);
}
perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
responseProto = handler.handle(msg, container, dispatcherContext);
long oPLatencyMS = Time.monotonicNow() - startTime;
if (responseProto != null) {
metrics.incContainerOpsLatencies(cmdType,
Time.monotonicNow() - startTime);
metrics.incContainerOpsLatencies(cmdType, oPLatencyMS);

// If the request is of Write Type and the container operation
// is unsuccessful, it implies the applyTransaction on the container
Expand Down Expand Up @@ -402,6 +408,8 @@ private ContainerCommandResponseProto dispatchRequest(
audit(action, eventType, params, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
perf.appendOpLatencyMs(oPLatencyMS);
performanceAudit(action, params, perf, oPLatencyMS);

return responseProto;
} else {
Expand All @@ -412,6 +420,13 @@ private ContainerCommandResponseProto dispatchRequest(
}
}

private long getSlowOpThresholdMs(ConfigurationSource config) {
return config.getTimeDuration(
HddsConfigKeys.HDDS_DATANODE_SLOW_OP_WARNING_THRESHOLD_KEY,
HddsConfigKeys.HDDS_DATANODE_SLOW_OP_WARNING_THRESHOLD_DEFAULT,
TimeUnit.MILLISECONDS);
}

private void updateBCSID(Container container,
DispatcherContext dispatcherContext, Type cmdType) {
if (dispatcherContext != null && (cmdType == Type.PutBlock
Expand Down Expand Up @@ -682,6 +697,26 @@ private void audit(AuditAction action, EventType eventType,
}
}

private void performanceAudit(AuditAction action, Map<String, String> params,
PerformanceStringBuilder performance, long opLatencyMs) {
if (isOperationSlow(opLatencyMs)) {
AuditMessage msg =
buildAuditMessageForPerformance(action, params, performance);
AUDIT.logPerformance(msg);
}
}

public AuditMessage buildAuditMessageForPerformance(AuditAction op,
Map<String, String> auditMap, PerformanceStringBuilder performance) {
return new AuditMessage.Builder()
.setUser(null)
.atIp(null)
.forOperation(op)
.withParams(auditMap)
.setPerformance(performance)
.build();
}

//TODO: use GRPC to fetch user and ip details
@Override
public AuditMessage buildAuditMessageForSuccess(AuditAction op,
Expand Down Expand Up @@ -845,6 +880,8 @@ private static Map<String, String> getAuditParams(
case ReadChunk:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getReadChunk().getBlockID()).toString());
auditParams.put("blockDataSize",
String.valueOf(msg.getReadChunk().getChunkData().getLen()));
return auditParams;

case DeleteChunk:
Expand All @@ -857,6 +894,8 @@ private static Map<String, String> getAuditParams(
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID())
.toString());
auditParams.put("blockDataSize",
String.valueOf(msg.getWriteChunk().getChunkData().getLen()));
return auditParams;

case ListChunk:
Expand All @@ -873,6 +912,8 @@ private static Map<String, String> getAuditParams(
auditParams.put("blockData",
BlockData.getFromProtoBuf(msg.getPutSmallFile()
.getBlock().getBlockData()).toString());
auditParams.put("blockDataSize",
String.valueOf(msg.getPutSmallFile().getChunkInfo().getLen()));
} catch (IOException ex) {
if (LOG.isTraceEnabled()) {
LOG.trace("Encountered error parsing BlockData from protobuf: "
Expand Down Expand Up @@ -904,4 +945,7 @@ private static Map<String, String> getAuditParams(

}

private boolean isOperationSlow(long opLatencyMs) {
return opLatencyMs >= slowOpThresholdMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ ContainerCommandResponseProto handleWriteChunk(
WriteChunkRequestProto writeChunk = request.getWriteChunk();
BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID());
ContainerProtos.ChunkInfo chunkInfoProto = writeChunk.getChunkData();

ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
Preconditions.checkNotNull(chunkInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class AuditLogger {
private static final Marker WRITE_MARKER = AuditMarker.WRITE.getMarker();
private static final Marker READ_MARKER = AuditMarker.READ.getMarker();
private static final Marker AUTH_MARKER = AuditMarker.AUTH.getMarker();
private static final Marker PERFORMANCE = AuditMarker.PERFORMANCE.getMarker();
private final AtomicReference<Set<String>> debugCmdSetRef =
new AtomicReference<>(new HashSet<>());
public static final String AUDIT_LOG_DEBUG_CMD_LIST_PREFIX =
Expand Down Expand Up @@ -118,6 +119,10 @@ public void logWrite(AuditMessage auditMessage) {
}
}

public void logPerformance(AuditMessage msg) {
this.logger.logIfEnabled(FQCN, Level.INFO, PERFORMANCE, msg, null);
}

public void refreshDebugCmdSet() {
OzoneConfiguration conf = new OzoneConfiguration();
refreshDebugCmdSet(conf);
Expand Down Expand Up @@ -161,6 +166,22 @@ public void appendOpLatencyNanos(long nanos) {
append("opLatencyMs", TimeUnit.NANOSECONDS.toMillis(nanos));
}

/**
* Appends pre-operation operation latency in milliseconds.
* @param millis Latency in nanoseconds.
*/
public void appendPreOpLatencyMs(long millis) {
append("preOpLatencyMs", millis);
}

/**
* Appends whole operation latency in milliseconds.
* @param millis Latency in milliseconds.
*/
public void appendOpLatencyMs(long millis) {
append("opLatencyMs", millis);
}

/**
* Appends the size in bytes.
* @param bytes Size in bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
public enum AuditMarker {
WRITE(MarkerManager.getMarker("WRITE")),
READ(MarkerManager.getMarker("READ")),
AUTH(MarkerManager.getMarker("AUTH")),;
AUTH(MarkerManager.getMarker("AUTH")),
PERFORMANCE(MarkerManager.getMarker("PERFORMANCE"));

private Marker marker;

Expand Down