Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,23 @@
import java.util.Optional;
import java.util.Set;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
Expand All @@ -60,9 +61,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import com.google.protobuf.ServiceException;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;

Expand All @@ -86,6 +85,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private final StateContext context;
private final float containerCloseThreshold;
private final ProtocolMessageMetrics<ProtocolMessageEnum> protocolMetrics;
private OzoneProtocolMessageDispatcher<ContainerCommandRequestProto,
ContainerCommandResponseProto, ProtocolMessageEnum> dispatcher;
private String scmID;
private ContainerMetrics metrics;
private final TokenVerifier tokenVerifier;
Expand Down Expand Up @@ -117,7 +118,12 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
new ProtocolMessageMetrics<>(
"HddsDispatcher",
"HDDS dispatcher metrics",
ContainerProtos.Type.values());
Type.values());

this.dispatcher =
new OzoneProtocolMessageDispatcher<>("DatanodeClient",
protocolMetrics,
LOG);
}

@Override
Expand Down Expand Up @@ -158,16 +164,13 @@ public void buildMissingContainerSetAndValidate(
@Override
public ContainerCommandResponseProto dispatch(
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
String spanName = "HddsDispatcher." + msg.getCmdType().name();
long startTime = System.nanoTime();
Span span = TracingUtil
.importAndCreateSpan(spanName, msg.getTraceID());
try (Scope scope = GlobalTracer.get().activateSpan(span)) {
return dispatchRequest(msg, dispatcherContext);
} finally {
span.finish();
protocolMetrics
.increment(msg.getCmdType(), System.nanoTime() - startTime);
try {
return dispatcher.processRequest(msg,
req -> dispatchRequest(msg, dispatcherContext),
msg.getCmdType(),
msg.getTraceID());
} catch (ServiceException ex) {
throw new RuntimeException(ex);
}
}

Expand All @@ -189,16 +192,16 @@ private ContainerCommandResponseProto dispatchRequest(
ContainerType containerType;
ContainerCommandResponseProto responseProto = null;
long startTime = System.nanoTime();
ContainerProtos.Type cmdType = msg.getCmdType();
Type cmdType = msg.getCmdType();
long containerID = msg.getContainerID();
metrics.incContainerOpsMetrics(cmdType);
Container container = getContainer(containerID);
boolean isWriteStage =
(cmdType == ContainerProtos.Type.WriteChunk && dispatcherContext != null
(cmdType == Type.WriteChunk && dispatcherContext != null
&& dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.WRITE_DATA);
boolean isWriteCommitStage =
(cmdType == ContainerProtos.Type.WriteChunk && dispatcherContext != null
(cmdType == Type.WriteChunk && dispatcherContext != null
&& dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMMIT_DATA);

Expand All @@ -213,7 +216,7 @@ private ContainerCommandResponseProto dispatchRequest(
// if the command gets executed other than Ratis, the default write stage
// is WriteChunkStage.COMBINED
boolean isCombinedStage =
cmdType == ContainerProtos.Type.WriteChunk && (dispatcherContext == null
cmdType == Type.WriteChunk && (dispatcherContext == null
|| dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMBINED);
Map<Long, Long> container2BCSIDMap = null;
Expand Down Expand Up @@ -243,13 +246,13 @@ private ContainerCommandResponseProto dispatchRequest(
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}

if (cmdType != ContainerProtos.Type.CreateContainer) {
if (cmdType != Type.CreateContainer) {
/**
* Create Container should happen only as part of Write_Data phase of
* writeChunk.
*/
if (container == null && ((isWriteStage || isCombinedStage)
|| cmdType == ContainerProtos.Type.PutSmallFile)) {
|| cmdType == Type.PutSmallFile)) {
// If container does not exist, create one for WriteChunk and
// PutSmallFile request
responseProto = createContainer(msg);
Expand Down Expand Up @@ -317,7 +320,7 @@ private ContainerCommandResponseProto dispatchRequest(
// state here.

Result result = responseProto.getResult();
if (cmdType == ContainerProtos.Type.CreateContainer
if (cmdType == Type.CreateContainer
&& result == Result.SUCCESS && dispatcherContext != null) {
Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
Expand Down Expand Up @@ -374,9 +377,9 @@ private ContainerCommandResponseProto dispatchRequest(
}

private void updateBCSID(Container container,
DispatcherContext dispatcherContext, ContainerProtos.Type cmdType) {
if (dispatcherContext != null && (cmdType == ContainerProtos.Type.PutBlock
|| cmdType == ContainerProtos.Type.PutSmallFile)) {
DispatcherContext dispatcherContext, Type cmdType) {
if (dispatcherContext != null && (cmdType == Type.PutBlock
|| cmdType == Type.PutSmallFile)) {
Preconditions.checkNotNull(container);
long bcsID = container.getBlockCommitSequenceId();
long containerId = container.getContainerData().getContainerID();
Expand Down Expand Up @@ -406,7 +409,7 @@ ContainerCommandResponseProto createContainer(

ContainerCommandRequestProto.Builder requestBuilder =
ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.CreateContainer)
.setCmdType(Type.CreateContainer)
.setContainerID(containerRequest.getContainerID())
.setCreateContainer(createRequest.build())
.setPipelineID(containerRequest.getPipelineID())
Expand Down Expand Up @@ -448,7 +451,7 @@ public void validateContainerCommand(
return;
}
ContainerType containerType = container.getContainerType();
ContainerProtos.Type cmdType = msg.getCmdType();
Type cmdType = msg.getCmdType();
AuditAction action =
ContainerCommandRequestPBHelper.getAuditAction(cmdType);
EventType eventType = getEventType(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB

private final StorageContainerDatanodeProtocol impl;
private final OzoneProtocolMessageDispatcher<SCMDatanodeRequest,
SCMDatanodeResponse> dispatcher;
SCMDatanodeResponse, ProtocolMessageEnum> dispatcher;

public StorageContainerDatanodeProtocolServerSideTranslatorPB(
StorageContainerDatanodeProtocol impl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;

import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.ServiceException;
import io.opentracing.Span;
import org.slf4j.Logger;
Expand All @@ -34,17 +33,17 @@
* It logs the message type/content on DEBUG/TRACING log for insight and create
* a new span based on the tracing information.
*/
public class OzoneProtocolMessageDispatcher<REQUEST, RESPONSE> {
public class OzoneProtocolMessageDispatcher<REQUEST, RESPONSE, TYPE> {

private String serviceName;

private final ProtocolMessageMetrics<ProtocolMessageEnum>
private final ProtocolMessageMetrics<TYPE>
protocolMessageMetrics;

private Logger logger;

public OzoneProtocolMessageDispatcher(String serviceName,
ProtocolMessageMetrics<ProtocolMessageEnum> protocolMessageMetrics,
ProtocolMessageMetrics<TYPE> protocolMessageMetrics,
Logger logger) {
this.serviceName = serviceName;
this.protocolMessageMetrics = protocolMessageMetrics;
Expand All @@ -54,7 +53,7 @@ public OzoneProtocolMessageDispatcher(String serviceName,
public RESPONSE processRequest(
REQUEST request,
FunctionWithServiceException<REQUEST, RESPONSE> methodCall,
ProtocolMessageEnum type,
TYPE type,
String traceId) throws ServiceException {
Span span = TracingUtil.importAndCreateSpan(type.toString(), traceId);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@
*/
package org.apache.hadoop.hdds.scm.protocol;

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto.ResponseCode;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto.ResponseCode;
import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest;
import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
Expand All @@ -37,6 +32,12 @@
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;

import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is the server-side translator that forwards requests received on
* {@link SCMSecurityProtocolPB} to the {@link
Expand All @@ -51,7 +52,7 @@ public class SCMSecurityProtocolServerSideTranslatorPB
private final SCMSecurityProtocol impl;

private OzoneProtocolMessageDispatcher<SCMSecurityRequest,
SCMSecurityResponse>
SCMSecurityResponse, ProtocolMessageEnum>
dispatcher;

public SCMSecurityProtocolServerSideTranslatorPB(SCMSecurityProtocol impl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
.getLogger(ScmBlockLocationProtocolServerSideTranslatorPB.class);

private final OzoneProtocolMessageDispatcher<SCMBlockLocationRequest,
SCMBlockLocationResponse>
SCMBlockLocationResponse, ProtocolMessageEnum>
dispatcher;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
Expand All @@ -39,21 +38,22 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineBatchResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
Expand All @@ -79,11 +79,10 @@
import com.google.protobuf.ProtocolMessageEnum;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.errorPipelineAlreadyExists;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto.Error.success;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is the server-side translator that forwards requests received on
Expand All @@ -101,7 +100,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
private final StorageContainerLocationProtocol impl;

private OzoneProtocolMessageDispatcher<ScmContainerLocationRequest,
ScmContainerLocationResponse>
ScmContainerLocationResponse, ProtocolMessageEnum>
dispatcher;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -28,11 +29,8 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.server.http.PrometheusMetricsSink;
import org.apache.hadoop.ozone.insight.LoggerSource.Level;

import com.google.protobuf.ProtocolMessageEnum;

/**
* Default implementation of Insight point logic.
*/
Expand All @@ -42,7 +40,7 @@ public abstract class BaseInsightPoint implements InsightPoint {
* List the related metrics.
*/
@Override
public List<MetricGroupDisplay> getMetrics() {
public List<MetricGroupDisplay> getMetrics(Map<String, String> filters) {
return new ArrayList<>();
}

Expand Down Expand Up @@ -91,21 +89,35 @@ public Level defaultLevel(boolean verbose) {
return verbose ? Level.TRACE : Level.DEBUG;
}

public void addProtocolMessageMetrics(
List<MetricGroupDisplay> metrics,
String prefix,
Component.Type type,
Object[] types
) {
addProtocolMessageMetrics(metrics, prefix, new Component(type), types);
}

/**
* Default metrics for any message type based RPC ServerSide translators.
*/
public void addProtocolMessageMetrics(List<MetricGroupDisplay> metrics,
public void addProtocolMessageMetrics(
List<MetricGroupDisplay> metrics,
String prefix,
Component.Type component,
ProtocolMessageEnum[] types) {
Component component,
Object[] types
) {

MetricGroupDisplay messageTypeCounters =
new MetricGroupDisplay(component, "Message type counters");
for (ProtocolMessageEnum type : types) {
for (Object type : types) {
String typeName = type.toString();
MetricDisplay metricDisplay = new MetricDisplay("Number of " + typeName,
prefix + "_" + PrometheusMetricsSink
.normalizeName(typeName));

Map<String, String> typeFilter = new HashMap<>();
typeFilter.put("type", typeName);
MetricDisplay metricDisplay =
new MetricDisplay("Number of " + typeName + " calls",
prefix + "_counter", typeFilter);
messageTypeCounters.addMetrics(metricDisplay);
}
metrics.add(messageTypeCounters);
Expand Down
Loading