diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java index e6b410628693..8d4820e60f62 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -80,7 +81,12 @@ public void run() { */ private void publishReport() { try { - context.addReport(getReport()); + GeneratedMessage report = getReport(); + if (report instanceof CommandStatusReportsProto) { + context.addIncrementalReport(report); + } else { + context.refreshFullReport(report); + } } catch (IOException e) { LOG.error("Exception while publishing report.", e); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 5c3aaa9119aa..ca734681d77f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -32,6 +32,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -40,7 +41,6 @@ import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; import com.google.protobuf.Descriptors.Descriptor; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport; @@ -63,10 +63,11 @@ import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; import static java.lang.Math.min; -import org.apache.commons.collections.CollectionUtils; - import static org.apache.hadoop.hdds.utils.HddsServerUtil.getLogWarnInterval; import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmHeartbeatInterval; + +import org.apache.commons.collections.CollectionUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,10 +94,6 @@ public class StateContext { @VisibleForTesting static final String CRL_STATUS_REPORT_PROTO_NAME = CRLStatusReport.getDescriptor().getFullName(); - // Accepted types of reports that can be queued to incrementalReportsQueue - private static final Set ACCEPTED_INCREMENTAL_REPORT_TYPE_SET = - Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME, - INCREMENTAL_CONTAINER_REPORT_PROTO_NAME); static final Logger LOG = LoggerFactory.getLogger(StateContext.class); @@ -121,6 +118,14 @@ public class StateContext { private boolean shutdownOnError = false; private boolean shutdownGracefully = false; private final AtomicLong threadPoolNotAvailableCount; + // Endpoint -> ReportType -> Boolean of whether the full report should be + // queued in getFullReports call. + private final Map> fullReportSendIndicator; + // List of supported full report types. + private final List fullReportTypeList; + // ReportType -> Report. + private final Map> type2Reports; /** * term of latest leader SCM, extract from SCMCommand. @@ -167,6 +172,24 @@ public StateContext(ConfigurationSource conf, lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); threadPoolNotAvailableCount = new AtomicLong(0); + fullReportSendIndicator = new HashMap<>(); + fullReportTypeList = new ArrayList<>(); + type2Reports = new HashMap<>(); + initReportTypeCollection(); + } + + /** + * init related ReportType Collections. + */ + private void initReportTypeCollection(){ + fullReportTypeList.add(CONTAINER_REPORTS_PROTO_NAME); + type2Reports.put(CONTAINER_REPORTS_PROTO_NAME, containerReports); + fullReportTypeList.add(NODE_REPORT_PROTO_NAME); + type2Reports.put(NODE_REPORT_PROTO_NAME, nodeReport); + fullReportTypeList.add(PIPELINE_REPORTS_PROTO_NAME); + type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports); + fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME); + type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport); } /** @@ -254,7 +277,7 @@ public boolean getShutdownOnError() { * * @param report report to be added */ - public void addReport(GeneratedMessage report) { + public void addIncrementalReport(GeneratedMessage report) { if (report == null) { return; } @@ -262,23 +285,38 @@ public void addReport(GeneratedMessage report) { Preconditions.checkState(descriptor != null); final String reportType = descriptor.getFullName(); Preconditions.checkState(reportType != null); - if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) { - containerReports.set(report); - } else if (reportType.equals(NODE_REPORT_PROTO_NAME)) { - nodeReport.set(report); - } else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) { - pipelineReports.set(report); - } else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) { - synchronized (incrementalReportsQueue) { - for (InetSocketAddress endpoint : endpoints) { - incrementalReportsQueue.get(endpoint).add(report); - } + // in some case, we want to add a fullReportType message + // as an incremental message. + // see XceiverServerRatis#sendPipelineReport + synchronized (incrementalReportsQueue) { + for (InetSocketAddress endpoint : endpoints) { + incrementalReportsQueue.get(endpoint).add(report); } - } else if(reportType.equals(CRL_STATUS_REPORT_PROTO_NAME)) { - crlStatusReport.set(report); - } else { + } + } + + /** + * refresh Full report. + * + * @param report report to be refreshed + */ + public void refreshFullReport(GeneratedMessage report) { + if (report == null) { + return; + } + final Descriptor descriptor = report.getDescriptorForType(); + Preconditions.checkState(descriptor != null); + final String reportType = descriptor.getFullName(); + Preconditions.checkState(reportType != null); + if (!fullReportTypeList.contains(reportType)) { throw new IllegalArgumentException( - "Unidentified report message type: " + reportType); + "not full report message type: " + reportType); + } + type2Reports.get(reportType).set(report); + if (fullReportSendIndicator != null) { + for (Map mp : fullReportSendIndicator.values()) { + mp.get(reportType).set(true); + } } } @@ -301,10 +339,6 @@ public void putBackReports(List reportsToPutBack, Preconditions.checkState(descriptor != null); final String reportType = descriptor.getFullName(); Preconditions.checkState(reportType != null); - if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) { - throw new IllegalArgumentException( - "Unaccepted report message type: " + reportType); - } } synchronized (incrementalReportsQueue) { if (incrementalReportsQueue.containsKey(endpoint)){ @@ -340,23 +374,27 @@ List getIncrementalReports( return reportsToReturn; } - List getNonIncrementalReports() { + List getFullReports( + InetSocketAddress endpoint) { + Map mp = fullReportSendIndicator.get(endpoint); List nonIncrementalReports = new LinkedList<>(); - GeneratedMessage report = containerReports.get(); - if (report != null) { - nonIncrementalReports.add(report); - } - report = nodeReport.get(); - if (report != null) { - nonIncrementalReports.add(report); - } - report = pipelineReports.get(); - if (report != null) { - nonIncrementalReports.add(report); - } - report = crlStatusReport.get(); - if (report != null) { - nonIncrementalReports.add(report); + if (null != mp){ + for (Map.Entry kv : mp.entrySet()) { + if (kv.getValue().get()) { + String reportType = kv.getKey(); + final AtomicReference ref = + type2Reports.get(reportType); + if (ref == null) { + throw new RuntimeException(reportType + " is not a valid full " + + "report type!"); + } + final GeneratedMessage msg = ref.get(); + if (msg != null) { + nonIncrementalReports.add(msg); + mp.get(reportType).set(false); + } + } + } } return nonIncrementalReports; } @@ -372,7 +410,7 @@ public List getReports(InetSocketAddress endpoint, if (maxLimit < 0) { throw new IllegalArgumentException("Illegal maxLimit value: " + maxLimit); } - List reports = getNonIncrementalReports(); + List reports = getFullReports(endpoint); if (maxLimit <= reports.size()) { return reports.subList(0, maxLimit); } else { @@ -800,6 +838,11 @@ public void addEndpoint(InetSocketAddress endpoint) { this.containerActions.put(endpoint, new LinkedList<>()); this.pipelineActions.put(endpoint, new LinkedList<>()); this.incrementalReportsQueue.put(endpoint, new LinkedList<>()); + Map mp = new HashMap<>(); + fullReportTypeList.forEach(e -> { + mp.putIfAbsent(e, new AtomicBoolean(true)); + }); + this.fullReportSendIndicator.putIfAbsent(endpoint, mp); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 60b79785b843..cb65e371c845 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -150,7 +150,7 @@ public EndpointStateMachine.EndPointStates call() throws Exception { } catch (IOException ex) { Preconditions.checkState(requestBuilder != null); // put back the reports which failed to be sent - putBackReports(requestBuilder); + putBackIncrementalReports(requestBuilder); rpcEndpoint.logIfNeeded(ex); } finally { rpcEndpoint.unlock(); @@ -159,7 +159,8 @@ public EndpointStateMachine.EndPointStates call() throws Exception { } // TODO: Make it generic. - private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) { + private void putBackIncrementalReports( + SCMHeartbeatRequestProto.Builder requestBuilder) { List reports = new LinkedList<>(); // We only put back CommandStatusReports and IncrementalContainerReport // because those are incremental. Container/Node/PipelineReport are diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 6fd2706a8b34..3a2aec98ed32 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -917,7 +917,8 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId, private void sendPipelineReport() { if (context != null) { // TODO: Send IncrementalPipelineReport instead of full PipelineReport - context.addReport(context.getParent().getContainer().getPipelineReport()); + context.addIncrementalReport( + context.getParent().getContainer().getPipelineReport()); context.getParent().triggerHeartbeat(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 736e3d6852b8..2312c0b6cc0e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -130,7 +130,7 @@ public OzoneContainer( .newBuilder() .addReport(containerReplicaProto) .build(); - context.addReport(icr); + context.addIncrementalReport(icr); context.getParent().triggerHeartbeat(); }; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index c97d703d00fc..83e44d3adf83 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -133,7 +133,7 @@ public void testPublishReport() throws InterruptedException { Thread.sleep(150); executorService.shutdown(); Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount); - verify(dummyContext, times(1)).addReport(null); + verify(dummyContext, times(1)).refreshFullReport(null); // After executor shutdown, no new reports should be published Thread.sleep(100); Assert.assertEquals(1, ((DummyReportPublisher) publisher).getReportCount); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index d152f4d6d2bb..6d0ad16f0406 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; @@ -106,35 +106,7 @@ public void testPutBackReports() { // getReports dequeues incremental reports expectedReportCount.clear(); - // Case 2: Attempt to put back a full report - - try { - ctx.putBackReports(Collections.singletonList( - newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME)), scm1); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } - try { - ctx.putBackReports(Collections.singletonList( - newMockReport(StateContext.NODE_REPORT_PROTO_NAME)), scm2); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } - try { - ctx.putBackReports(Collections.singletonList( - newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)), scm1); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } - try { - ctx.putBackReports(Collections.singletonList( - newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME)), scm1); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } - - // Case 3: Put back mixed types of incremental reports - + // Case 2: Put back mixed types of incremental reports ctx.putBackReports(Arrays.asList( newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME), newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME), @@ -152,31 +124,6 @@ public void testPutBackReports() { checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); // getReports dequeues incremental reports expectedReportCount.clear(); - - // Case 4: Attempt to put back mixed types of full reports - - try { - ctx.putBackReports(Arrays.asList( - newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME), - newMockReport(StateContext.NODE_REPORT_PROTO_NAME), - newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME), - newMockReport(StateContext.CRL_STATUS_REPORT_PROTO_NAME) - ), scm1); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } - - // Case 5: Attempt to put back mixed full and incremental reports - - try { - ctx.putBackReports(Arrays.asList( - newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME), - newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME), - newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME) - ), scm2); - fail("Should throw exception when putting back unaccepted reports!"); - } catch (IllegalArgumentException ignored) { - } } @Test @@ -197,8 +144,48 @@ public void testReportQueueWithAddReports() { Map expectedReportCount = new HashMap<>(); + // Add a bunch of ContainerReports + batchRefreshfullReports(ctx, + StateContext.CONTAINER_REPORTS_PROTO_NAME, 128); + // Should only keep the latest one + expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + // every time getAllAvailableReports is called , if we want to get a full + // report of a certain type, we must call "batchRefreshfullReports" for + // this type to refresh. + expectedReportCount.remove(StateContext.CONTAINER_REPORTS_PROTO_NAME); + + // Add a bunch of NodeReport + batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128); + // Should only keep the latest one + expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + expectedReportCount.remove(StateContext.NODE_REPORT_PROTO_NAME); + + // Add a bunch of PipelineReports + batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); + // Should only keep the latest one + expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + expectedReportCount.remove(StateContext.PIPELINE_REPORTS_PROTO_NAME); + + // Add a bunch of CommandStatusReports + batchAddIncrementalReport(ctx, + StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128); + expectedReportCount.put( + StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128); + // Should keep all of them + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + // getReports dequeues incremental reports + expectedReportCount.remove( + StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME); + // Add a bunch of IncrementalContainerReport - batchAddReports(ctx, + batchAddIncrementalReport(ctx, StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128); // Should keep all of them expectedReportCount.put( @@ -210,9 +197,16 @@ public void testReportQueueWithAddReports() { StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME); } - void batchAddReports(StateContext ctx, String reportName, int count) { + void batchRefreshfullReports(StateContext ctx, String reportName, int count) { + for (int i = 0; i < count; i++) { + ctx.refreshFullReport(newMockReport(reportName)); + } + } + + void batchAddIncrementalReport(StateContext ctx, + String reportName, int count) { for (int i = 0; i < count; i++) { - ctx.addReport(newMockReport(reportName)); + ctx.addIncrementalReport(newMockReport(reportName)); } } @@ -243,7 +237,7 @@ public void testContainerNodePipelineReportAPIs() { assertNull(context1.getPipelineReports()); GeneratedMessage containerReports = newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME); - context1.addReport(containerReports); + context1.refreshFullReport(containerReports); assertNotNull(context1.getContainerReports()); assertEquals(StateContext.CONTAINER_REPORTS_PROTO_NAME, @@ -255,7 +249,7 @@ public void testContainerNodePipelineReportAPIs() { StateContext context2 = newStateContext(conf, datanodeStateMachineMock); GeneratedMessage nodeReport = newMockReport(StateContext.NODE_REPORT_PROTO_NAME); - context2.addReport(nodeReport); + context2.refreshFullReport(nodeReport); assertNull(context2.getContainerReports()); assertNotNull(context2.getNodeReport()); @@ -267,7 +261,7 @@ public void testContainerNodePipelineReportAPIs() { StateContext context3 = newStateContext(conf, datanodeStateMachineMock); GeneratedMessage pipelineReports = newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME); - context3.addReport(pipelineReports); + context3.refreshFullReport(pipelineReports); assertNull(context3.getContainerReports()); assertNull(context3.getNodeReport()); @@ -311,7 +305,7 @@ public void testReportAPIs() { newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME); // Try to add report with zero endpoint. Should not be stored. - stateContext.addReport(generatedMessage); + stateContext.addIncrementalReport(generatedMessage); assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty()); // Add 2 scm endpoints. @@ -319,7 +313,7 @@ public void testReportAPIs() { stateContext.addEndpoint(scm2); // Add report. Should be added to all endpoints. - stateContext.addReport(generatedMessage); + stateContext.addIncrementalReport(generatedMessage); List allAvailableReports = stateContext.getAllAvailableReports(scm1); assertEquals(1, allAvailableReports.size()); @@ -462,9 +456,9 @@ public void testIsThreadPoolAvailable() throws Exception { // task num greater than pool size for (int i = 0; i < threadPoolSize; i++) { - executorService.submit(() -> futureOne.get()); + executorService.submit((Callable) futureOne::get); } - executorService.submit(() -> futureTwo.get()); + executorService.submit((Callable) futureTwo::get); Assert.assertFalse(stateContext.isThreadPoolAvailable(executorService)); @@ -483,8 +477,8 @@ public void doesNotAwaitWithoutExecute() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(1); CompletableFuture future = new CompletableFuture<>(); - executorService.submit(() -> future.get()); - executorService.submit(() -> future.get()); + executorService.submit((Callable) future::get); + executorService.submit((Callable) future::get); StateContext subject = new StateContext(new OzoneConfiguration(), DatanodeStates.INIT, mock(DatanodeStateMachine.class)) { @@ -549,11 +543,13 @@ public void testGetReports() { Map expectedReportCount = new HashMap<>(); // Add a bunch of ContainerReports - batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128); - batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128); - batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); - batchAddReports(ctx, StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128); - batchAddReports(ctx, + batchRefreshfullReports(ctx, + StateContext.CONTAINER_REPORTS_PROTO_NAME, 128); + batchRefreshfullReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128); + batchRefreshfullReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); + batchRefreshfullReports(ctx, + StateContext.CRL_STATUS_REPORT_PROTO_NAME, 128); + batchAddIncrementalReport(ctx, StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128); // Should only keep the latest one diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index 9b238a188eb6..0f215f4238c9 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -96,7 +96,7 @@ public void testheartbeatWithNodeReports() throws Exception { HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( conf, context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); - context.addReport(NodeReportProto.getDefaultInstance()); + context.refreshFullReport(NodeReportProto.getDefaultInstance()); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); Assert.assertTrue(heartbeat.hasDatanodeDetails()); @@ -128,7 +128,7 @@ public void testheartbeatWithContainerReports() throws Exception { HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( conf, context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); - context.addReport(ContainerReportsProto.getDefaultInstance()); + context.refreshFullReport(ContainerReportsProto.getDefaultInstance()); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); Assert.assertTrue(heartbeat.hasDatanodeDetails()); @@ -160,7 +160,8 @@ public void testheartbeatWithCommandStatusReports() throws Exception { HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( conf, context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); - context.addReport(CommandStatusReportsProto.getDefaultInstance()); + context.addIncrementalReport( + CommandStatusReportsProto.getDefaultInstance()); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); Assert.assertTrue(heartbeat.hasDatanodeDetails()); @@ -224,9 +225,10 @@ public void testheartbeatWithAllReports() throws Exception { HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( conf, context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); - context.addReport(NodeReportProto.getDefaultInstance()); - context.addReport(ContainerReportsProto.getDefaultInstance()); - context.addReport(CommandStatusReportsProto.getDefaultInstance()); + context.refreshFullReport(NodeReportProto.getDefaultInstance()); + context.refreshFullReport(ContainerReportsProto.getDefaultInstance()); + context.addIncrementalReport( + CommandStatusReportsProto.getDefaultInstance()); context.addContainerAction(getContainerAction()); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index 11f6f467a4da..a3ce0856a1dd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -392,7 +392,8 @@ private void verifyPendingDeleteEvent() logCapturer.clearOutput(); cluster.getHddsDatanodes().get(0) - .getDatanodeStateMachine().getContext().addReport(dummyReport); + .getDatanodeStateMachine().getContext(). + addIncrementalReport(dummyReport); cluster.getHddsDatanodes().get(0) .getDatanodeStateMachine().triggerHeartbeat(); // wait for event to be handled by event handler