Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;

Expand Down Expand Up @@ -80,7 +81,12 @@ public void run() {
*/
private void publishReport() {
try {
context.addReport(getReport());
GeneratedMessage report = getReport();
if (report instanceof CommandStatusReportsProto) {
context.addIncrementalReport(report);
} else {
context.refreshFullReport(report);
}
} catch (IOException e) {
LOG.error("Exception while publishing report.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -40,7 +41,6 @@
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.protobuf.Descriptors.Descriptor;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
Expand All @@ -63,10 +63,11 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import static java.lang.Math.min;
import org.apache.commons.collections.CollectionUtils;

import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval;

import org.apache.commons.collections.CollectionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -93,10 +94,6 @@ public class StateContext {
@VisibleForTesting
static final String CRL_STATUS_REPORT_PROTO_NAME =
CRLStatusReport.getDescriptor().getFullName();
// Accepted types of reports that can be queued to incrementalReportsQueue
private static final Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);

static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
Expand All @@ -121,6 +118,14 @@ public class StateContext {
private boolean shutdownOnError = false;
private boolean shutdownGracefully = false;
private final AtomicLong threadPoolNotAvailableCount;
// Endpoint -> ReportType -> Boolean of whether the full report should be
// queued in getFullReports call.
private final Map<InetSocketAddress,
Map<String, AtomicBoolean>> fullReportSendIndicator;
// List of supported full report types.
private final List<String> fullReportTypeList;
// ReportType -> Report.
private final Map<String, AtomicReference<GeneratedMessage>> type2Reports;

/**
* term of latest leader SCM, extract from SCMCommand.
Expand Down Expand Up @@ -167,6 +172,24 @@ public StateContext(ConfigurationSource conf,
lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0);
threadPoolNotAvailableCount = new AtomicLong(0);
fullReportSendIndicator = new HashMap<>();
fullReportTypeList = new ArrayList<>();
type2Reports = new HashMap<>();
initReportTypeCollection();
}

/**
* init related ReportType Collections.
*/
private void initReportTypeCollection(){
fullReportTypeList.add(CONTAINER_REPORTS_PROTO_NAME);
type2Reports.put(CONTAINER_REPORTS_PROTO_NAME, containerReports);
fullReportTypeList.add(NODE_REPORT_PROTO_NAME);
type2Reports.put(NODE_REPORT_PROTO_NAME, nodeReport);
fullReportTypeList.add(PIPELINE_REPORTS_PROTO_NAME);
type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports);
fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME);
type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport);
}

/**
Expand Down Expand Up @@ -254,31 +277,46 @@ public boolean getShutdownOnError() {
*
* @param report report to be added
*/
public void addReport(GeneratedMessage report) {
public void addIncrementalReport(GeneratedMessage report) {
if (report == null) {
return;
}
final Descriptor descriptor = report.getDescriptorForType();
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
Preconditions.checkState(reportType != null);
if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) {
containerReports.set(report);
} else if (reportType.equals(NODE_REPORT_PROTO_NAME)) {
nodeReport.set(report);
} else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) {
pipelineReports.set(report);
} else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
synchronized (incrementalReportsQueue) {
for (InetSocketAddress endpoint : endpoints) {
incrementalReportsQueue.get(endpoint).add(report);
}
// in some case, we want to add a fullReportType message
// as an incremental message.
// see XceiverServerRatis#sendPipelineReport
synchronized (incrementalReportsQueue) {
for (InetSocketAddress endpoint : endpoints) {
incrementalReportsQueue.get(endpoint).add(report);
}
} else if(reportType.equals(CRL_STATUS_REPORT_PROTO_NAME)) {
crlStatusReport.set(report);
} else {
}
}

/**
* refresh Full report.
*
* @param report report to be refreshed
*/
public void refreshFullReport(GeneratedMessage report) {
if (report == null) {
return;
}
final Descriptor descriptor = report.getDescriptorForType();
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
Preconditions.checkState(reportType != null);
if (!fullReportTypeList.contains(reportType)) {
throw new IllegalArgumentException(
"Unidentified report message type: " + reportType);
"not full report message type: " + reportType);
}
type2Reports.get(reportType).set(report);
if (fullReportSendIndicator != null) {
for (Map<String, AtomicBoolean> mp : fullReportSendIndicator.values()) {
mp.get(reportType).set(true);
}
}
}

Expand All @@ -301,10 +339,6 @@ public void putBackReports(List<GeneratedMessage> reportsToPutBack,
Preconditions.checkState(descriptor != null);
final String reportType = descriptor.getFullName();
Preconditions.checkState(reportType != null);
if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
throw new IllegalArgumentException(
"Unaccepted report message type: " + reportType);
}
}
synchronized (incrementalReportsQueue) {
if (incrementalReportsQueue.containsKey(endpoint)){
Expand Down Expand Up @@ -340,23 +374,27 @@ List<GeneratedMessage> getIncrementalReports(
return reportsToReturn;
}

List<GeneratedMessage> getNonIncrementalReports() {
List<GeneratedMessage> getFullReports(
InetSocketAddress endpoint) {
Map<String, AtomicBoolean> mp = fullReportSendIndicator.get(endpoint);
List<GeneratedMessage> nonIncrementalReports = new LinkedList<>();
GeneratedMessage report = containerReports.get();
if (report != null) {
nonIncrementalReports.add(report);
}
report = nodeReport.get();
if (report != null) {
nonIncrementalReports.add(report);
}
report = pipelineReports.get();
if (report != null) {
nonIncrementalReports.add(report);
}
report = crlStatusReport.get();
if (report != null) {
nonIncrementalReports.add(report);
if (null != mp){
for (Map.Entry<String, AtomicBoolean> kv : mp.entrySet()) {
if (kv.getValue().get()) {
String reportType = kv.getKey();
final AtomicReference<GeneratedMessage> ref =
type2Reports.get(reportType);
if (ref == null) {
throw new RuntimeException(reportType + " is not a valid full "
+ "report type!");
}
final GeneratedMessage msg = ref.get();
if (msg != null) {
nonIncrementalReports.add(msg);
mp.get(reportType).set(false);
}
}
}
}
return nonIncrementalReports;
}
Expand All @@ -372,7 +410,7 @@ public List<GeneratedMessage> getReports(InetSocketAddress endpoint,
if (maxLimit < 0) {
throw new IllegalArgumentException("Illegal maxLimit value: " + maxLimit);
}
List<GeneratedMessage> reports = getNonIncrementalReports();
List<GeneratedMessage> reports = getFullReports(endpoint);
if (maxLimit <= reports.size()) {
return reports.subList(0, maxLimit);
} else {
Expand Down Expand Up @@ -800,6 +838,11 @@ public void addEndpoint(InetSocketAddress endpoint) {
this.containerActions.put(endpoint, new LinkedList<>());
this.pipelineActions.put(endpoint, new LinkedList<>());
this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
Map<String, AtomicBoolean> mp = new HashMap<>();
fullReportTypeList.forEach(e -> {
mp.putIfAbsent(e, new AtomicBoolean(true));
});
this.fullReportSendIndicator.putIfAbsent(endpoint, mp);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
} catch (IOException ex) {
Preconditions.checkState(requestBuilder != null);
// put back the reports which failed to be sent
putBackReports(requestBuilder);
putBackIncrementalReports(requestBuilder);
rpcEndpoint.logIfNeeded(ex);
} finally {
rpcEndpoint.unlock();
Expand All @@ -159,7 +159,8 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
}

// TODO: Make it generic.
private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
private void putBackIncrementalReports(
SCMHeartbeatRequestProto.Builder requestBuilder) {
List<GeneratedMessage> reports = new LinkedList<>();
// We only put back CommandStatusReports and IncrementalContainerReport
// because those are incremental. Container/Node/PipelineReport are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,8 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId,
private void sendPipelineReport() {
if (context != null) {
// TODO: Send IncrementalPipelineReport instead of full PipelineReport
context.addReport(context.getParent().getContainer().getPipelineReport());
context.addIncrementalReport(
context.getParent().getContainer().getPipelineReport());
context.getParent().triggerHeartbeat();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public OzoneContainer(
.newBuilder()
.addReport(containerReplicaProto)
.build();
context.addReport(icr);
context.addIncrementalReport(icr);
context.getParent().triggerHeartbeat();
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testPublishReport() throws InterruptedException {
Thread.sleep(150);
executorService.shutdown();
Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
verify(dummyContext, times(1)).addReport(null);
verify(dummyContext, times(1)).refreshFullReport(null);
// After executor shutdown, no new reports should be published
Thread.sleep(100);
Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount);
Expand Down
Loading