Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.util;

import java.io.IOException;
import java.util.Iterator;

/**
* An {@link Iterator} that may hold resources until it is closed.
*/
public interface SeekableIterator<K, E> extends ClosableIterator<E> {
void seek(K position) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -50,7 +51,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
Expand Down Expand Up @@ -80,11 +80,13 @@
import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
import org.apache.hadoop.ozone.util.SeekableIterator;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Endpoint for querying keys that belong to a container.
*/
Expand Down Expand Up @@ -585,119 +587,94 @@ public Response getContainerMisMatchInsights(

List<ContainerDiscrepancyInfo> containerDiscrepancyInfoList =
new ArrayList<>();
try {
Map<Long, ContainerMetadata> omContainers =
reconContainerMetadataManager.getContainers(-1, -1);
List<Long> scmNonDeletedContainers =
containerManager.getContainers().stream()
.filter(containerInfo -> containerInfo.getState() !=
HddsProtos.LifeCycleState.DELETED)
.map(containerInfo -> containerInfo.getContainerID())
.collect(Collectors.toList());
DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase());

Long minContainerID = prevKey + 1;
Iterator<ContainerInfo> scmNonDeletedContainers =
containerManager.getContainers().stream()
.filter(containerInfo -> (containerInfo.getContainerID() >= minContainerID))
.filter(containerInfo -> containerInfo.getState() != HddsProtos.LifeCycleState.DELETED)
.sorted(Comparator.comparingLong(ContainerInfo::getContainerID)).iterator();
ContainerInfo scmContainerInfo = scmNonDeletedContainers.hasNext() ?
scmNonDeletedContainers.next() : null;
DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase());
try (SeekableIterator<Long, ContainerMetadata> omContainers =
reconContainerMetadataManager.getContainersIterator()) {
omContainers.seek(minContainerID);
ContainerMetadata containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
switch (dataFilter) {

case SCM:
List<Map.Entry<Long, ContainerMetadata>> notSCMContainers =
omContainers.entrySet().stream()
.filter(
containerMetadataEntry -> !scmNonDeletedContainers.contains(
containerMetadataEntry.getKey()))
.collect(Collectors.toList());

if (prevKey > 0) {
int index = 0;
while (index < notSCMContainers.size() &&
notSCMContainers.get(index).getKey() <= prevKey) {
index++;
}
if (index < notSCMContainers.size()) {
notSCMContainers = notSCMContainers.subList(index,
Math.min(index + limit, notSCMContainers.size()));
List<ContainerMetadata> notSCMContainers = new ArrayList<>();
while (containerMetadata != null && notSCMContainers.size() < limit) {
Long omContainerID = containerMetadata.getContainerID();
Long scmContainerID = scmContainerInfo == null ? null : scmContainerInfo.getContainerID();
if (omContainerID.equals(scmContainerID)) {
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
} else if (scmContainerID == null || omContainerID.compareTo(scmContainerID) < 0) {
notSCMContainers.add(containerMetadata);
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
} else {
notSCMContainers = Collections.emptyList();
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
}
} else {
notSCMContainers = notSCMContainers.subList(0,
Math.min(limit, notSCMContainers.size()));
}

notSCMContainers.forEach(nonSCMContainer -> {
ContainerDiscrepancyInfo containerDiscrepancyInfo =
new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(nonSCMContainer.getKey());
containerDiscrepancyInfo.setContainerID(nonSCMContainer.getContainerID());
containerDiscrepancyInfo.setNumberOfKeys(
nonSCMContainer.getValue().getNumberOfKeys());
nonSCMContainer.getNumberOfKeys());
containerDiscrepancyInfo.setPipelines(
nonSCMContainer.getValue().getPipelines());
nonSCMContainer.getPipelines());
containerDiscrepancyInfo.setExistsAt("OM");
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
});
break;

case OM:
List<Long> nonOMContainers = scmNonDeletedContainers.stream()
.filter(containerId -> !omContainers.containsKey(containerId))
.collect(Collectors.toList());

if (prevKey > 0) {
int index = 0;
while (index < nonOMContainers.size() &&
nonOMContainers.get(index) <= prevKey) {
index++;
}
if (index < nonOMContainers.size()) {
nonOMContainers = nonOMContainers.subList(index,
Math.min(index + limit, nonOMContainers.size()));
List<ContainerInfo> nonOMContainers = new ArrayList<>();
while (scmContainerInfo != null && nonOMContainers.size() < limit) {
Long omContainerID = containerMetadata == null ? null : containerMetadata.getContainerID();
Long scmContainerID = scmContainerInfo.getContainerID();
if (scmContainerID.equals(omContainerID)) {
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
} else if (omContainerID == null || scmContainerID.compareTo(omContainerID) < 0) {
nonOMContainers.add(scmContainerInfo);
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
} else {
nonOMContainers = Collections.emptyList();
//Seeking directly to SCM containerId sequential read is just wasteful here if there are too many values
// to be read in b/w omContainerID & scmContainerID since (omContainerId<scmContainerID)
omContainers.seek(scmContainerID);
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
}
} else {
nonOMContainers = nonOMContainers.subList(0,
Math.min(limit, nonOMContainers.size()));
}

List<Pipeline> pipelines = new ArrayList<>();
nonOMContainers.forEach(nonOMContainerId -> {
boolean containerExistsInScm = true;
ContainerDiscrepancyInfo containerDiscrepancyInfo =
new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(nonOMContainerId);
nonOMContainers.forEach(containerInfo -> {
ContainerDiscrepancyInfo containerDiscrepancyInfo = new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(containerInfo.getContainerID());
containerDiscrepancyInfo.setNumberOfKeys(0);
PipelineID pipelineID = null;
try {
pipelineID = containerManager.getContainer(
ContainerID.valueOf(nonOMContainerId)).getPipelineID();
pipelineID = containerInfo.getPipelineID();
if (pipelineID != null) {
pipelines.add(pipelineManager.getPipeline(pipelineID));
}
} catch (ContainerNotFoundException e) {
containerExistsInScm = false;
LOG.warn("Container {} not found in SCM: {}", nonOMContainerId,
e);
} catch (PipelineNotFoundException e) {
LOG.debug(
"Pipeline not found for container: {} and pipelineId: {}",
nonOMContainerId, pipelineID, e);
}
// The container might have been deleted in SCM after the call to
// get the list of containers
if (containerExistsInScm) {
containerDiscrepancyInfo.setPipelines(pipelines);
containerDiscrepancyInfo.setExistsAt("SCM");
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
containerInfo, pipelineID, e);
}
containerDiscrepancyInfo.setPipelines(pipelines);
containerDiscrepancyInfo.setExistsAt("SCM");
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
});
break;

default:
// Invalid filter parameter value
return Response.status(Response.Status.BAD_REQUEST).build();
}
} catch (IOException ex) {
throw new WebApplicationException(ex,
Response.Status.INTERNAL_SERVER_ERROR);
} catch (IllegalArgumentException e) {
throw new WebApplicationException(e, Response.Status.BAD_REQUEST);
} catch (Exception ex) {
Expand Down Expand Up @@ -745,43 +722,44 @@ public Response getOmContainersDeletedInSCM(
// Send back an empty response
return Response.status(Response.Status.NOT_ACCEPTABLE).build();
}
List<ContainerDiscrepancyInfo> containerDiscrepancyInfoList =
new ArrayList<>();
try {
Map<Long, ContainerMetadata> omContainers =
reconContainerMetadataManager.getContainers(limit, prevKey);

List<Long> deletedStateSCMContainerIds =
containerManager.getContainers().stream()
.filter(containerInfo -> (containerInfo.getState() ==
HddsProtos.LifeCycleState.DELETED))
.map(containerInfo -> containerInfo.getContainerID()).collect(
Collectors.toList());

List<Map.Entry<Long, ContainerMetadata>>
omContainersDeletedInSCM =
omContainers.entrySet().stream().filter(containerMetadataEntry ->
(deletedStateSCMContainerIds.contains(
containerMetadataEntry.getKey())))
.collect(
Collectors.toList());

omContainersDeletedInSCM.forEach(
containerMetadataEntry -> {
ContainerDiscrepancyInfo containerDiscrepancyInfo =
new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(
containerMetadataEntry.getKey());
containerDiscrepancyInfo.setNumberOfKeys(
containerMetadataEntry.getValue().getNumberOfKeys());
containerDiscrepancyInfo.setPipelines(
containerMetadataEntry.getValue()
.getPipelines());
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
});
} catch (IOException ex) {
throw new WebApplicationException(ex,
Response.Status.INTERNAL_SERVER_ERROR);
if (limit <= 0) {
limit = Integer.MAX_VALUE;
}
long minContainerID = prevKey + 1;
Iterator<ContainerInfo> deletedStateSCMContainers = containerManager.getContainers().stream()
.filter(containerInfo -> containerInfo.getContainerID() >= minContainerID)
.filter(containerInfo -> containerInfo.getState() == HddsProtos.LifeCycleState.DELETED)
.sorted(Comparator.comparingLong(ContainerInfo::getContainerID)).iterator();
List<ContainerDiscrepancyInfo> containerDiscrepancyInfoList;
try (SeekableIterator<Long, ContainerMetadata> omContainers =
reconContainerMetadataManager.getContainersIterator()) {
ContainerInfo scmContainerInfo = deletedStateSCMContainers.hasNext() ? deletedStateSCMContainers.next() : null;
ContainerMetadata containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
List<ContainerMetadata> omContainersDeletedInSCM = new ArrayList<>();
while (containerMetadata != null && scmContainerInfo != null
&& omContainersDeletedInSCM.size() < limit) {
Long omContainerID = containerMetadata.getContainerID();
Long scmContainerID = scmContainerInfo.getContainerID();
if (scmContainerID.equals(omContainerID)) {
omContainersDeletedInSCM.add(containerMetadata);
scmContainerInfo = deletedStateSCMContainers.hasNext() ? deletedStateSCMContainers.next() : null;
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
} else if (scmContainerID.compareTo(omContainerID) < 0) {
scmContainerInfo = deletedStateSCMContainers.hasNext() ? deletedStateSCMContainers.next() : null;
} else {
// Seek directly to scmContainerId iterating sequentially is very wasteful here.
omContainers.seek(scmContainerID);
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
}
}

containerDiscrepancyInfoList = omContainersDeletedInSCM.stream().map(containerMetadataEntry -> {
ContainerDiscrepancyInfo containerDiscrepancyInfo = new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(containerMetadataEntry.getContainerID());
containerDiscrepancyInfo.setNumberOfKeys(containerMetadataEntry.getNumberOfKeys());
containerDiscrepancyInfo.setPipelines(containerMetadataEntry.getPipelines());
return containerDiscrepancyInfo;
}).collect(Collectors.toList());
} catch (IllegalArgumentException e) {
throw new WebApplicationException(e, Response.Status.BAD_REQUEST);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory;
import org.apache.hadoop.ozone.util.SeekableIterator;

/**
* The Recon Container DB Service interface.
Expand Down Expand Up @@ -186,6 +187,9 @@ Map<ContainerKeyPrefix, Integer> getKeyPrefixesForContainer(
Map<Long, ContainerMetadata> getContainers(int limit, long prevContainer)
throws IOException;


SeekableIterator<Long, ContainerMetadata> getContainersIterator() throws IOException;

/**
* Delete an entry in the container DB.
*
Expand Down
Loading