Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
Expand All @@ -56,6 +57,9 @@ public final class ContainerUtils {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerUtils.class);

private static final ByteString REDACTED =
ByteString.copyFromUtf8("<redacted>");

private ContainerUtils() {
//never constructed.
}
Expand All @@ -73,7 +77,7 @@ public static ContainerCommandResponseProto logAndReturnError(
ContainerCommandRequestProto request) {
String logInfo = "Operation: {} , Trace ID: {} , Message: {} , " +
"Result: {} , StorageContainerException Occurred.";
log.info(logInfo, request.getCmdType().name(), request.getTraceID(),
log.info(logInfo, request.getCmdType(), request.getTraceID(),
ex.getMessage(), ex.getResult().getValueDescriptor().getName(), ex);
return getContainerCommandResponse(request, ex.getResult(), ex.getMessage())
.build();
Expand Down Expand Up @@ -101,7 +105,6 @@ public static long getContainerIDFromFile(File containerFile) {
* Verifies that this is indeed a new container.
*
* @param containerFile - Container File to verify
* @throws FileAlreadyExistsException
*/
public static void verifyIsNewContainer(File containerFile) throws
FileAlreadyExistsException {
Expand All @@ -125,7 +128,7 @@ public static String getContainerDbFileName(String containerName) {
*
* @throws IOException when read/write error occurs
*/
public synchronized static void writeDatanodeDetailsTo(
public static synchronized void writeDatanodeDetailsTo(
DatanodeDetails datanodeDetails, File path) throws IOException {
if (path.exists()) {
if (!path.delete() || !path.createNewFile()) {
Expand All @@ -147,15 +150,15 @@ public synchronized static void writeDatanodeDetailsTo(
* @return {@link DatanodeDetails}
* @throws IOException If the id file is malformed or other I/O exceptions
*/
public synchronized static DatanodeDetails readDatanodeDetailsFrom(File path)
public static synchronized DatanodeDetails readDatanodeDetailsFrom(File path)
throws IOException {
if (!path.exists()) {
throw new IOException("Datanode ID file not found.");
}
try {
return DatanodeIdYaml.readDatanodeIdFile(path);
} catch (IOException e) {
LOG.warn("Error loading DatanodeDetails yaml from " +
LOG.warn("Error loading DatanodeDetails yaml from {}",
path.getAbsolutePath(), e);
// Try to load as protobuf before giving up
try (FileInputStream in = new FileInputStream(path)) {
Expand All @@ -171,8 +174,6 @@ public synchronized static DatanodeDetails readDatanodeDetailsFrom(File path)
/**
* Verify that the checksum stored in containerData is equal to the
* computed checksum.
* @param containerData
* @throws IOException
*/
public static void verifyChecksum(ContainerData containerData)
throws IOException {
Expand All @@ -196,7 +197,6 @@ public static void verifyChecksum(ContainerData containerData)
* Return the SHA-256 checksum of the containerData.
* @param containerDataYamlStr ContainerData as a Yaml String
* @return Checksum of the container data
* @throws StorageContainerException
*/
public static String getChecksum(String containerDataYamlStr)
throws StorageContainerException {
Expand Down Expand Up @@ -231,4 +231,54 @@ public static File getContainerFile(File containerBaseDir) {
public static long getContainerID(File containerBaseDir) {
return Long.parseLong(containerBaseDir.getName());
}

/**
* Remove binary data from request {@code msg}. (May be incomplete, feel
* free to add any missing cleanups.)
*/
public static ContainerCommandRequestProto processForDebug(
ContainerCommandRequestProto msg) {

if (msg == null) {
return null;
}

if (msg.hasWriteChunk() || msg.hasPutSmallFile()) {
ContainerCommandRequestProto.Builder builder = msg.toBuilder();
if (msg.hasWriteChunk()) {
builder.getWriteChunkBuilder().setData(REDACTED);
}
if (msg.hasPutSmallFile()) {
builder.getPutSmallFileBuilder().setData(REDACTED);
}
return builder.build();
}

return msg;
}

/**
* Remove binary data from response {@code msg}. (May be incomplete, feel
* free to add any missing cleanups.)
*/
public static ContainerCommandResponseProto processForDebug(
ContainerCommandResponseProto msg) {

if (msg == null) {
return null;
}

if (msg.hasReadChunk() || msg.hasGetSmallFile()) {
ContainerCommandResponseProto.Builder builder = msg.toBuilder();
if (msg.hasReadChunk()) {
builder.getReadChunkBuilder().setData(REDACTED);
}
if (msg.hasGetSmallFile()) {
builder.getGetSmallFileBuilder().getDataBuilder().setData(REDACTED);
}
return builder.build();
}

return msg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
this.dispatcher =
new OzoneProtocolMessageDispatcher<>("DatanodeClient",
protocolMetrics,
LOG);
LOG,
ContainerUtils::processForDebug,
ContainerUtils::processForDebug);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.opentracing.Span;
import org.slf4j.Logger;

import java.util.function.UnaryOperator;

/**
* Dispatch message after tracing and message logging for insight.
* <p>
Expand All @@ -35,19 +37,31 @@
*/
public class OzoneProtocolMessageDispatcher<REQUEST, RESPONSE, TYPE> {

private String serviceName;
private final String serviceName;

private final ProtocolMessageMetrics<TYPE>
protocolMessageMetrics;

private Logger logger;
private final Logger logger;
private final UnaryOperator<REQUEST> requestPreprocessor;
private final UnaryOperator<RESPONSE> responsePreprocessor;

public OzoneProtocolMessageDispatcher(String serviceName,
ProtocolMessageMetrics<TYPE> protocolMessageMetrics,
Logger logger) {
this(serviceName, protocolMessageMetrics, logger, req -> req, resp -> resp);
}

public OzoneProtocolMessageDispatcher(String serviceName,
ProtocolMessageMetrics<TYPE> protocolMessageMetrics,
Logger logger,
UnaryOperator<REQUEST> requestPreprocessor,
UnaryOperator<RESPONSE> responsePreprocessor) {
this.serviceName = serviceName;
this.protocolMessageMetrics = protocolMessageMetrics;
this.logger = logger;
this.requestPreprocessor = requestPreprocessor;
this.responsePreprocessor = responsePreprocessor;
}

public RESPONSE processRequest(
Expand All @@ -61,11 +75,11 @@ public RESPONSE processRequest(
logger.trace(
"[service={}] [type={}] request is received: <json>{}</json>",
serviceName,
type.toString(),
request.toString().replaceAll("\n", "\\\\n"));
type,
escapeNewLines(requestPreprocessor.apply(request)));
} else if (logger.isDebugEnabled()) {
logger.debug("{} {} request is received",
serviceName, type.toString());
serviceName, type);
}

long startTime = System.nanoTime();
Expand All @@ -79,13 +93,17 @@ public RESPONSE processRequest(
"[service={}] [type={}] request is processed. Response: "
+ "<json>{}</json>",
serviceName,
type.toString(),
response.toString().replaceAll("\n", "\\\\n"));
type,
escapeNewLines(responsePreprocessor.apply(response)));
}
return response;

} finally {
span.finish();
}
}

private static String escapeNewLines(Object input) {
return input.toString().replaceAll("\n", "\\\\n");
}
}