diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 4cd769f4d245..f87561a70a60 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -32,15 +32,23 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.google.protobuf.Descriptors.Descriptor; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; @@ -63,6 +71,27 @@ * Current Context of State Machine. */ public class StateContext { + + @VisibleForTesting + final static String CONTAINER_REPORTS_PROTO_NAME = + ContainerReportsProto.getDescriptor().getFullName(); + @VisibleForTesting + final static String NODE_REPORT_PROTO_NAME = + NodeReportProto.getDescriptor().getFullName(); + @VisibleForTesting + final static String PIPELINE_REPORTS_PROTO_NAME = + PipelineReportsProto.getDescriptor().getFullName(); + @VisibleForTesting + final static String COMMAND_STATUS_REPORTS_PROTO_NAME = + CommandStatusReportsProto.getDescriptor().getFullName(); + @VisibleForTesting + final static String INCREMENTAL_CONTAINER_REPORT_PROTO_NAME = + IncrementalContainerReportProto.getDescriptor().getFullName(); + // Accepted types of reports that can be queued to incrementalReportsQueue + private final static Set ACCEPTED_INCREMENTAL_REPORT_TYPE_SET = + Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME, + INCREMENTAL_CONTAINER_REPORT_PROTO_NAME); + static final Logger LOG = LoggerFactory.getLogger(StateContext.class); private final Queue commandQueue; @@ -72,7 +101,13 @@ public class StateContext { private final AtomicLong stateExecutionCount; private final ConfigurationSource conf; private final Set endpoints; - private final Map> reports; + // Only the latest full report of each type is kept + private final AtomicReference containerReports; + private final AtomicReference nodeReport; + private final AtomicReference pipelineReports; + // Incremental reports are queued in the map below + private final Map> + incrementalReportsQueue; private final Map> containerActions; private final Map> pipelineActions; private DatanodeStateMachine.DatanodeStates state; @@ -102,7 +137,10 @@ public StateContext(ConfigurationSource conf, this.parent = parent; commandQueue = new LinkedList<>(); cmdStatusMap = new ConcurrentHashMap<>(); - reports = new HashMap<>(); + incrementalReportsQueue = new HashMap<>(); + containerReports = new AtomicReference<>(); + nodeReport = new AtomicReference<>(); + pipelineReports = new AtomicReference<>(); endpoints = new HashSet<>(); containerActions = new HashMap<>(); pipelineActions = new HashMap<>(); @@ -190,17 +228,34 @@ void setShutdownGracefully() { public boolean getShutdownOnError() { return shutdownOnError; } + /** * Adds the report to report queue. * * @param report report to be added */ public void addReport(GeneratedMessage report) { - if (report != null) { - synchronized (reports) { - for (InetSocketAddress endpoint : endpoints) { - reports.get(endpoint).add(report); + if (report == null) { + return; + } + final Descriptor descriptor = report.getDescriptorForType(); + Preconditions.checkState(descriptor != null); + final String reportType = descriptor.getFullName(); + Preconditions.checkState(reportType != null); + for (InetSocketAddress endpoint : endpoints) { + if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) { + containerReports.set(report); + } else if (reportType.equals(NODE_REPORT_PROTO_NAME)) { + nodeReport.set(report); + } else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) { + pipelineReports.set(report); + } else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) { + synchronized (incrementalReportsQueue) { + incrementalReportsQueue.get(endpoint).add(report); } + } else { + throw new IllegalArgumentException( + "Unidentified report message type: " + reportType); } } } @@ -214,9 +269,24 @@ public void addReport(GeneratedMessage report) { */ public void putBackReports(List reportsToPutBack, InetSocketAddress endpoint) { - synchronized (reports) { - if (reports.containsKey(endpoint)){ - reports.get(endpoint).addAll(0, reportsToPutBack); + if (LOG.isDebugEnabled()) { + LOG.debug("endpoint: {}, size of reportsToPutBack: {}", + endpoint, reportsToPutBack.size()); + } + // We don't expect too much reports to be put back + for (GeneratedMessage report : reportsToPutBack) { + final Descriptor descriptor = report.getDescriptorForType(); + Preconditions.checkState(descriptor != null); + final String reportType = descriptor.getFullName(); + Preconditions.checkState(reportType != null); + if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) { + throw new IllegalArgumentException( + "Unaccepted report message type: " + reportType); + } + } + synchronized (incrementalReportsQueue) { + if (incrementalReportsQueue.containsKey(endpoint)){ + incrementalReportsQueue.get(endpoint).addAll(0, reportsToPutBack); } } } @@ -232,6 +302,22 @@ public List getAllAvailableReports( return getReports(endpoint, Integer.MAX_VALUE); } + List getIncrementalReports( + InetSocketAddress endpoint, int maxLimit) { + List reportsToReturn = new LinkedList<>(); + synchronized (incrementalReportsQueue) { + List reportsForEndpoint = + incrementalReportsQueue.get(endpoint); + if (reportsForEndpoint != null) { + List tempList = reportsForEndpoint.subList( + 0, min(reportsForEndpoint.size(), maxLimit)); + reportsToReturn.addAll(tempList); + tempList.clear(); + } + } + return reportsToReturn; + } + /** * Returns available reports from the report queue with a max limit on * list size, or empty list if the queue is empty. @@ -240,15 +326,19 @@ public List getAllAvailableReports( */ public List getReports(InetSocketAddress endpoint, int maxLimit) { - List reportsToReturn = new LinkedList<>(); - synchronized (reports) { - List reportsForEndpoint = reports.get(endpoint); - if (reportsForEndpoint != null) { - List tempList = reportsForEndpoint.subList( - 0, min(reportsForEndpoint.size(), maxLimit)); - reportsToReturn.addAll(tempList); - tempList.clear(); - } + List reportsToReturn = + getIncrementalReports(endpoint, maxLimit); + GeneratedMessage report = containerReports.get(); + if (report != null) { + reportsToReturn.add(report); + } + report = nodeReport.get(); + if (report != null) { + reportsToReturn.add(report); + } + report = pipelineReports.get(); + if (report != null) { + reportsToReturn.add(report); } return reportsToReturn; } @@ -580,7 +670,22 @@ public void addEndpoint(InetSocketAddress endpoint) { this.endpoints.add(endpoint); this.containerActions.put(endpoint, new LinkedList<>()); this.pipelineActions.put(endpoint, new LinkedList<>()); - this.reports.put(endpoint, new LinkedList<>()); + this.incrementalReportsQueue.put(endpoint, new LinkedList<>()); } } + + @VisibleForTesting + public GeneratedMessage getContainerReports() { + return containerReports.get(); + } + + @VisibleForTesting + public GeneratedMessage getNodeReport() { + return nodeReport.get(); + } + + @VisibleForTesting + public GeneratedMessage getPipelineReports() { + return pipelineReports.get(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index cea4295042b4..4e436c4a9cd4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -142,15 +142,15 @@ public EndpointStateMachine.EndPointStates call() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat message :: {}", request.toString()); } - SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() + SCMHeartbeatResponseProto response = rpcEndpoint.getEndPoint() .sendHeartbeat(request); - processResponse(reponse, datanodeDetailsProto); + processResponse(response, datanodeDetailsProto); rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); rpcEndpoint.zeroMissedCount(); } catch (IOException ex) { + Preconditions.checkState(requestBuilder != null); // put back the reports which failed to be sent putBackReports(requestBuilder); - rpcEndpoint.logIfNeeded(ex); } finally { rpcEndpoint.unlock(); @@ -161,12 +161,9 @@ public EndpointStateMachine.EndPointStates call() throws Exception { // TODO: Make it generic. private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) { List reports = new LinkedList<>(); - if (requestBuilder.hasContainerReport()) { - reports.add(requestBuilder.getContainerReport()); - } - if (requestBuilder.hasNodeReport()) { - reports.add(requestBuilder.getNodeReport()); - } + // We only put back CommandStatusReports and IncrementalContainerReport + // because those are incremental. Container/Node/PipelineReport are + // accumulative so we can keep only the latest of each. if (requestBuilder.getCommandStatusReportsCount() != 0) { reports.addAll(requestBuilder.getCommandStatusReportsList()); } @@ -194,6 +191,7 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) { } else { requestBuilder.setField(descriptor, report); } + break; } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index d3032c3211f5..e9c39d3ee026 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -23,11 +23,19 @@ import static org.apache.hadoop.test.GenericTestUtils.waitFor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; @@ -37,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.Descriptors.Descriptor; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; @@ -53,6 +62,271 @@ */ public class TestStateContext { + /** + * Only accepted types of reports can be put back to the report queue. + */ + @Test + public void testPutBackReports() { + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeStateMachine datanodeStateMachineMock = + mock(DatanodeStateMachine.class); + + StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(), + datanodeStateMachineMock); + InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001); + ctx.addEndpoint(scm1); + InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001); + ctx.addEndpoint(scm2); + + Map expectedReportCount = new HashMap<>(); + + // Case 1: Put back an incremental report + + ctx.putBackReports(Collections.singletonList(newMockReport( + StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME)), scm1); + // scm2 report queue should be empty + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + // Check scm1 queue + expectedReportCount.put( + StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 1); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + // getReports dequeues incremental reports + expectedReportCount.clear(); + + ctx.putBackReports(Collections.singletonList(newMockReport( + StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME)), scm2); + // scm1 report queue should be empty + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + // Check scm2 queue + expectedReportCount.put( + StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 1); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + // getReports dequeues incremental reports + expectedReportCount.clear(); + + // Case 2: Attempt to put back a full report + + try { + ctx.putBackReports(Collections.singletonList( + newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME)), scm1); + fail("Should throw exception when putting back unaccepted reports!"); + } catch (IllegalArgumentException ignored) { + } + try { + ctx.putBackReports(Collections.singletonList( + newMockReport(StateContext.NODE_REPORT_PROTO_NAME)), scm2); + fail("Should throw exception when putting back unaccepted reports!"); + } catch (IllegalArgumentException ignored) { + } + try { + ctx.putBackReports(Collections.singletonList( + newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)), scm1); + fail("Should throw exception when putting back unaccepted reports!"); + } catch (IllegalArgumentException ignored) { + } + + // Case 3: Put back mixed types of incremental reports + + ctx.putBackReports(Arrays.asList( + newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME), + newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME), + newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME), + newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME), + newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME) + ), scm1); + // scm2 report queue should be empty + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + // Check scm1 queue + expectedReportCount.put( + StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 2); + expectedReportCount.put( + StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 3); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + // getReports dequeues incremental reports + expectedReportCount.clear(); + + // Case 4: Attempt to put back mixed types of full reports + + try { + ctx.putBackReports(Arrays.asList( + newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME), + newMockReport(StateContext.NODE_REPORT_PROTO_NAME), + newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME) + ), scm1); + fail("Should throw exception when putting back unaccepted reports!"); + } catch (IllegalArgumentException ignored) { + } + + // Case 5: Attempt to put back mixed full and incremental reports + + try { + ctx.putBackReports(Arrays.asList( + newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME), + newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME), + newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME) + ), scm2); + fail("Should throw exception when putting back unaccepted reports!"); + } catch (IllegalArgumentException ignored) { + } + } + + @Test + public void testReportQueueWithAddReports() { + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeStateMachine datanodeStateMachineMock = + mock(DatanodeStateMachine.class); + + StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(), + datanodeStateMachineMock); + InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001); + ctx.addEndpoint(scm1); + InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001); + ctx.addEndpoint(scm2); + // Check initial state + assertEquals(0, ctx.getAllAvailableReports(scm1).size()); + assertEquals(0, ctx.getAllAvailableReports(scm2).size()); + + Map expectedReportCount = new HashMap<>(); + + // Add a bunch of ContainerReports + batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128); + // Should only keep the latest one + expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + + // Add a bunch of NodeReport + batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128); + // Should only keep the latest one + expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + + // Add a bunch of PipelineReports + batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); + // Should only keep the latest one + expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + + // Add a bunch of PipelineReports + batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128); + // Should only keep the latest one + expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + + // Add a bunch of CommandStatusReports + batchAddReports(ctx, + StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128); + expectedReportCount.put( + StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128); + // Should keep all of them + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + // getReports dequeues incremental reports + expectedReportCount.remove( + StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME); + + // Add a bunch of IncrementalContainerReport + batchAddReports(ctx, + StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128); + // Should keep all of them + expectedReportCount.put( + StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128); + checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount); + checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount); + // getReports dequeues incremental reports + expectedReportCount.remove( + StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME); + } + + void batchAddReports(StateContext ctx, String reportName, int count) { + for (int i = 0; i < count; i++) { + ctx.addReport(newMockReport(reportName)); + } + } + + void checkReportCount(List reports, + Map expectedReportCount) { + Map reportCount = new HashMap<>(); + for (GeneratedMessage report : reports) { + final String reportName = report.getDescriptorForType().getFullName(); + reportCount.put(reportName, reportCount.getOrDefault(reportName, 0) + 1); + } + // Verify + assertEquals(expectedReportCount, reportCount); + } + + /** + * Check if Container, Node and Pipeline report APIs work as expected. + */ + @Test + public void testContainerNodePipelineReportAPIs() { + OzoneConfiguration conf = new OzoneConfiguration(); + DatanodeStateMachine datanodeStateMachineMock = + mock(DatanodeStateMachine.class); + + // ContainerReports + StateContext context1 = newStateContext(conf, datanodeStateMachineMock); + assertNull(context1.getContainerReports()); + assertNull(context1.getNodeReport()); + assertNull(context1.getPipelineReports()); + GeneratedMessage containerReports = + newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME); + context1.addReport(containerReports); + + assertNotNull(context1.getContainerReports()); + assertEquals(StateContext.CONTAINER_REPORTS_PROTO_NAME, + context1.getContainerReports().getDescriptorForType().getFullName()); + assertNull(context1.getNodeReport()); + assertNull(context1.getPipelineReports()); + + // NodeReport + StateContext context2 = newStateContext(conf, datanodeStateMachineMock); + GeneratedMessage nodeReport = + newMockReport(StateContext.NODE_REPORT_PROTO_NAME); + context2.addReport(nodeReport); + + assertNull(context2.getContainerReports()); + assertNotNull(context2.getNodeReport()); + assertEquals(StateContext.NODE_REPORT_PROTO_NAME, + context2.getNodeReport().getDescriptorForType().getFullName()); + assertNull(context2.getPipelineReports()); + + // PipelineReports + StateContext context3 = newStateContext(conf, datanodeStateMachineMock); + GeneratedMessage pipelineReports = + newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME); + context3.addReport(pipelineReports); + + assertNull(context3.getContainerReports()); + assertNull(context3.getNodeReport()); + assertNotNull(context3.getPipelineReports()); + assertEquals(StateContext.PIPELINE_REPORTS_PROTO_NAME, + context3.getPipelineReports().getDescriptorForType().getFullName()); + } + + private StateContext newStateContext(OzoneConfiguration conf, + DatanodeStateMachine datanodeStateMachineMock) { + StateContext stateContext = new StateContext(conf, + DatanodeStates.getInitState(), datanodeStateMachineMock); + InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001); + stateContext.addEndpoint(scm1); + InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001); + stateContext.addEndpoint(scm2); + return stateContext; + } + + private GeneratedMessage newMockReport(String messageType) { + GeneratedMessage pipelineReports = mock(GeneratedMessage.class); + when(pipelineReports.getDescriptorForType()).thenReturn( + mock(Descriptor.class)); + when(pipelineReports.getDescriptorForType().getFullName()).thenReturn( + messageType); + return pipelineReports; + } + @Test public void testReportAPIs() { OzoneConfiguration conf = new OzoneConfiguration(); @@ -64,8 +338,14 @@ public void testReportAPIs() { InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001); InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001); - // Try to add report with endpoint. Should not be stored. - stateContext.addReport(mock(GeneratedMessage.class)); + GeneratedMessage generatedMessage = mock(GeneratedMessage.class); + when(generatedMessage.getDescriptorForType()).thenReturn( + mock(Descriptor.class)); + when(generatedMessage.getDescriptorForType().getFullName()).thenReturn( + "hadoop.hdds.CommandStatusReportsProto"); + + // Try to add report with zero endpoint. Should not be stored. + stateContext.addReport(generatedMessage); assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty()); // Add 2 scm endpoints. @@ -73,7 +353,7 @@ public void testReportAPIs() { stateContext.addEndpoint(scm2); // Add report. Should be added to all endpoints. - stateContext.addReport(mock(GeneratedMessage.class)); + stateContext.addReport(generatedMessage); List allAvailableReports = stateContext.getAllAvailableReports(scm1); assertEquals(1, allAvailableReports.size()); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java index febd1c3bd0df..d23f1c49bf13 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java @@ -44,6 +44,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -79,7 +80,10 @@ public void setup() throws Exception { Mockito.when(raftClient.getGroupManagementApi( Mockito.any(RaftPeerId.class))).thenReturn(raftClientGroupManager); PowerMockito.mockStatic(RaftClient.class); - PowerMockito.when(RaftClient.newBuilder()).thenReturn(builder); + // Work around for mockito bug: + // https://github.com/powermock/powermock/issues/992 + PowerMockito.when(RaftClient.newBuilder()).thenAnswer( + (Answer) invocation -> builder); } private RaftClient.Builder mockRaftClientBuilder() { diff --git a/hadoop-hdds/container-service/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/hadoop-hdds/container-service/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000000..3c9e1c8a6971 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +mock-maker-inline \ No newline at end of file