Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2cb1635
HDDS-4404. Datanode can go OOM when a Recon or SCM Server is very slo…
smengcl Nov 19, 2020
3e53bf8
Checkstyle.
smengcl Nov 19, 2020
024a4f8
Add mock-maker-inline so UT can mock final classes.
smengcl Nov 19, 2020
22846f4
Remove throw exception in addReport.
smengcl Nov 19, 2020
878d271
Fix existing UT TestStateContext#testReportAPIs.
smengcl Nov 19, 2020
5b2bbd3
Remove unused imports.
smengcl Nov 19, 2020
2fcb2b6
Work around mockito bug in UT TestCreatePipelineCommandHandler: cause…
smengcl Nov 20, 2020
c403208
Add UT testContainerNodePipelineReportAPIs.
smengcl Nov 20, 2020
23d25cb
Clean up.
smengcl Nov 20, 2020
50b9a5e
@VisibleForTesting for get container/node/pipeline reports as they ar…
smengcl Nov 23, 2020
6f3ea18
- Renamed `reports` to `incrementalReportsQueue` to better reflect th…
smengcl Nov 24, 2020
9718488
Make containerReports, nodeReport, pipelineReports atomic and final.
smengcl Nov 24, 2020
7988203
Checkstyle.
smengcl Nov 24, 2020
fdc5823
Clean up.
smengcl Nov 24, 2020
3a32e8b
Remove assertion in putBackReports as requestBuilder might include Co…
smengcl Nov 30, 2020
84e8f74
Empty commit to retrigger CI.
smengcl Dec 1, 2020
02298d9
Empty commit to retrigger CI.
smengcl Dec 7, 2020
63a89ab
Retrigger.
smengcl Dec 7, 2020
c64f247
Merge branch 'master' into HDDS-4404-v2
smengcl Dec 8, 2020
34283ca
Allow null as input to StateContext#addReport() as this blocked UT Te…
smengcl Dec 9, 2020
ab37c22
Retrigger CI
smengcl Dec 10, 2020
f0dca47
trigger new CI check
adoroszlai Dec 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,23 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.protobuf.Descriptors.Descriptor;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
Expand All @@ -63,6 +71,27 @@
* Current Context of State Machine.
*/
public class StateContext {

@VisibleForTesting
final static String CONTAINER_REPORTS_PROTO_NAME =
ContainerReportsProto.getDescriptor().getFullName();
@VisibleForTesting
final static String NODE_REPORT_PROTO_NAME =
NodeReportProto.getDescriptor().getFullName();
@VisibleForTesting
final static String PIPELINE_REPORTS_PROTO_NAME =
PipelineReportsProto.getDescriptor().getFullName();
@VisibleForTesting
final static String COMMAND_STATUS_REPORTS_PROTO_NAME =
CommandStatusReportsProto.getDescriptor().getFullName();
@VisibleForTesting
final static String INCREMENTAL_CONTAINER_REPORT_PROTO_NAME =
IncrementalContainerReportProto.getDescriptor().getFullName();
// Accepted types of reports that can be queued to incrementalReportsQueue
private final static Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);

static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
private final Queue<SCMCommand> commandQueue;
Expand All @@ -72,7 +101,13 @@ public class StateContext {
private final AtomicLong stateExecutionCount;
private final ConfigurationSource conf;
private final Set<InetSocketAddress> endpoints;
private final Map<InetSocketAddress, List<GeneratedMessage>> reports;
// Only the latest full report of each type is kept
private final AtomicReference<GeneratedMessage> containerReports;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may make them to be volatile, which is simpler but thread safe as well,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip. I will keep atomic for now as the report is not updated so frequently on DNs (low performance impact).

private final AtomicReference<GeneratedMessage> nodeReport;
private final AtomicReference<GeneratedMessage> pipelineReports;
// Incremental reports are queued in the map below
private final Map<InetSocketAddress, List<GeneratedMessage>>
incrementalReportsQueue;
private final Map<InetSocketAddress, Queue<ContainerAction>> containerActions;
private final Map<InetSocketAddress, Queue<PipelineAction>> pipelineActions;
private DatanodeStateMachine.DatanodeStates state;
Expand Down Expand Up @@ -102,7 +137,10 @@ public StateContext(ConfigurationSource conf,
this.parent = parent;
commandQueue = new LinkedList<>();
cmdStatusMap = new ConcurrentHashMap<>();
reports = new HashMap<>();
incrementalReportsQueue = new HashMap<>();
containerReports = new AtomicReference<>();
nodeReport = new AtomicReference<>();
pipelineReports = new AtomicReference<>();
endpoints = new HashSet<>();
containerActions = new HashMap<>();
pipelineActions = new HashMap<>();
Expand Down Expand Up @@ -190,17 +228,34 @@ void setShutdownGracefully() {
public boolean getShutdownOnError() {
return shutdownOnError;
}

/**
* Adds the report to report queue.
*
* @param report report to be added
*/
public void addReport(GeneratedMessage report) {
if (report != null) {
synchronized (reports) {
for (InetSocketAddress endpoint : endpoints) {
reports.get(endpoint).add(report);
if (report == null) {
return;
}
final Descriptor descriptor = report.getDescriptorForType();
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
Preconditions.checkState(reportType != null);
for (InetSocketAddress endpoint : endpoints) {
if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) {
containerReports.set(report);
} else if (reportType.equals(NODE_REPORT_PROTO_NAME)) {
nodeReport.set(report);
} else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) {
pipelineReports.set(report);
} else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
synchronized (incrementalReportsQueue) {
incrementalReportsQueue.get(endpoint).add(report);
}
} else {
throw new IllegalArgumentException(
"Unidentified report message type: " + reportType);
}
}
}
Expand All @@ -214,9 +269,24 @@ public void addReport(GeneratedMessage report) {
*/
public void putBackReports(List<GeneratedMessage> reportsToPutBack,
InetSocketAddress endpoint) {
synchronized (reports) {
if (reports.containsKey(endpoint)){
reports.get(endpoint).addAll(0, reportsToPutBack);
if (LOG.isDebugEnabled()) {
LOG.debug("endpoint: {}, size of reportsToPutBack: {}",
endpoint, reportsToPutBack.size());
}
// We don't expect too much reports to be put back
for (GeneratedMessage report : reportsToPutBack) {
final Descriptor descriptor = report.getDescriptorForType();
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
Preconditions.checkState(reportType != null);
if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
throw new IllegalArgumentException(
"Unaccepted report message type: " + reportType);
}
}
synchronized (incrementalReportsQueue) {
if (incrementalReportsQueue.containsKey(endpoint)){
incrementalReportsQueue.get(endpoint).addAll(0, reportsToPutBack);
}
}
}
Expand All @@ -232,6 +302,22 @@ public List<GeneratedMessage> getAllAvailableReports(
return getReports(endpoint, Integer.MAX_VALUE);
}

List<GeneratedMessage> getIncrementalReports(
InetSocketAddress endpoint, int maxLimit) {
List<GeneratedMessage> reportsToReturn = new LinkedList<>();
synchronized (incrementalReportsQueue) {
List<GeneratedMessage> reportsForEndpoint =
incrementalReportsQueue.get(endpoint);
if (reportsForEndpoint != null) {
List<GeneratedMessage> tempList = reportsForEndpoint.subList(
0, min(reportsForEndpoint.size(), maxLimit));
reportsToReturn.addAll(tempList);
tempList.clear();
}
}
return reportsToReturn;
}

/**
* Returns available reports from the report queue with a max limit on
* list size, or empty list if the queue is empty.
Expand All @@ -240,15 +326,19 @@ public List<GeneratedMessage> getAllAvailableReports(
*/
public List<GeneratedMessage> getReports(InetSocketAddress endpoint,
int maxLimit) {
List<GeneratedMessage> reportsToReturn = new LinkedList<>();
synchronized (reports) {
List<GeneratedMessage> reportsForEndpoint = reports.get(endpoint);
if (reportsForEndpoint != null) {
List<GeneratedMessage> tempList = reportsForEndpoint.subList(
0, min(reportsForEndpoint.size(), maxLimit));
reportsToReturn.addAll(tempList);
tempList.clear();
}
List<GeneratedMessage> reportsToReturn =
getIncrementalReports(endpoint, maxLimit);
GeneratedMessage report = containerReports.get();
if (report != null) {
reportsToReturn.add(report);
}
report = nodeReport.get();
if (report != null) {
reportsToReturn.add(report);
}
report = pipelineReports.get();
if (report != null) {
reportsToReturn.add(report);
}
return reportsToReturn;
}
Expand Down Expand Up @@ -580,7 +670,22 @@ public void addEndpoint(InetSocketAddress endpoint) {
this.endpoints.add(endpoint);
this.containerActions.put(endpoint, new LinkedList<>());
this.pipelineActions.put(endpoint, new LinkedList<>());
this.reports.put(endpoint, new LinkedList<>());
this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
}
}

@VisibleForTesting
public GeneratedMessage getContainerReports() {
return containerReports.get();
}

@VisibleForTesting
public GeneratedMessage getNodeReport() {
return nodeReport.get();
}

@VisibleForTesting
public GeneratedMessage getPipelineReports() {
return pipelineReports.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat message :: {}", request.toString());
}
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
SCMHeartbeatResponseProto response = rpcEndpoint.getEndPoint()
.sendHeartbeat(request);
processResponse(reponse, datanodeDetailsProto);
processResponse(response, datanodeDetailsProto);
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount();
} catch (IOException ex) {
Preconditions.checkState(requestBuilder != null);
// put back the reports which failed to be sent
putBackReports(requestBuilder);

rpcEndpoint.logIfNeeded(ex);
} finally {
rpcEndpoint.unlock();
Expand All @@ -161,12 +161,9 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
// TODO: Make it generic.
private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
List<GeneratedMessage> reports = new LinkedList<>();
if (requestBuilder.hasContainerReport()) {
reports.add(requestBuilder.getContainerReport());
}
if (requestBuilder.hasNodeReport()) {
reports.add(requestBuilder.getNodeReport());
}
// We only put back CommandStatusReports and IncrementalContainerReport
// because those are incremental. Container/Node/PipelineReport are
// accumulative so we can keep only the latest of each.
if (requestBuilder.getCommandStatusReportsCount() != 0) {
reports.addAll(requestBuilder.getCommandStatusReportsList());
}
Expand Down Expand Up @@ -194,6 +191,7 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
} else {
requestBuilder.setField(descriptor, report);
}
break;
}
}
}
Expand Down
Loading