Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,23 @@ List<GeneratedMessage> getIncrementalReports(
return reportsToReturn;
}

List<GeneratedMessage> getNonIncrementalReports() {
List<GeneratedMessage> nonIncrementalReports = new LinkedList<>();
GeneratedMessage report = containerReports.get();
if (report != null) {
nonIncrementalReports.add(report);
}
report = nodeReport.get();
if (report != null) {
nonIncrementalReports.add(report);
}
report = pipelineReports.get();
if (report != null) {
nonIncrementalReports.add(report);
}
return nonIncrementalReports;
}

/**
* Returns available reports from the report queue with a max limit on
* list size, or empty list if the queue is empty.
Expand All @@ -326,21 +343,17 @@ List<GeneratedMessage> getIncrementalReports(
*/
public List<GeneratedMessage> getReports(InetSocketAddress endpoint,
int maxLimit) {
List<GeneratedMessage> reportsToReturn =
getIncrementalReports(endpoint, maxLimit);
GeneratedMessage report = containerReports.get();
if (report != null) {
reportsToReturn.add(report);
if (maxLimit < 0) {
throw new IllegalArgumentException("Illegal maxLimit value: " + maxLimit);
}
report = nodeReport.get();
if (report != null) {
reportsToReturn.add(report);
List<GeneratedMessage> reports = getNonIncrementalReports();
if (maxLimit <= reports.size()) {
return reports.subList(0, maxLimit);
} else {
reports.addAll(getIncrementalReports(endpoint,
maxLimit - reports.size()));
return reports;
}
report = pipelineReports.get();
if (report != null) {
reportsToReturn.add(report);
}
return reportsToReturn;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,40 @@ public DatanodeStates await(long time, TimeUnit timeUnit) {
assertEquals(1, awaited.get());
assertEquals(1, executed.get());
}

@Test
public void testGetReports() {
OzoneConfiguration conf = new OzoneConfiguration();
DatanodeStateMachine datanodeStateMachineMock =
mock(DatanodeStateMachine.class);

StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
datanodeStateMachineMock);
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
ctx.addEndpoint(scm1);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
ctx.addEndpoint(scm2);
// Check initial state
assertEquals(0, ctx.getAllAvailableReports(scm1).size());
assertEquals(0, ctx.getAllAvailableReports(scm2).size());

Map<String, Integer> expectedReportCount = new HashMap<>();

// Add a bunch of ContainerReports
batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
batchAddReports(ctx,
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);

// Should only keep the latest one
expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
// Should keep less or equal than maxLimit depending on other reports' size.
expectedReportCount.put(
StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 97);
checkReportCount(ctx.getReports(scm1, 100), expectedReportCount);
checkReportCount(ctx.getReports(scm2, 100), expectedReportCount);
}
}