diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/SeekableIterator.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/SeekableIterator.java new file mode 100644 index 000000000000..fb424b28cabd --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/SeekableIterator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.util; + +import java.io.IOException; +import java.util.Iterator; + +/** + * An {@link Iterator} that may hold resources until it is closed. + */ +public interface SeekableIterator extends ClosableIterator { + void seek(K position) throws IOException; +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java index 041bcc8e6b00..b38e7138a133 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java @@ -29,8 +29,9 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -50,7 +51,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -80,11 +80,13 @@ import org.apache.hadoop.ozone.recon.scm.ReconContainerManager; import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.apache.hadoop.ozone.util.SeekableIterator; import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Endpoint for querying keys that belong to a container. */ @@ -585,109 +587,87 @@ public Response getContainerMisMatchInsights( List containerDiscrepancyInfoList = new ArrayList<>(); - try { - Map omContainers = - reconContainerMetadataManager.getContainers(-1, -1); - List scmNonDeletedContainers = - containerManager.getContainers().stream() - .filter(containerInfo -> containerInfo.getState() != - HddsProtos.LifeCycleState.DELETED) - .map(containerInfo -> containerInfo.getContainerID()) - .collect(Collectors.toList()); - DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase()); - + Long minContainerID = prevKey + 1; + Iterator scmNonDeletedContainers = + containerManager.getContainers().stream() + .filter(containerInfo -> (containerInfo.getContainerID() >= minContainerID)) + .filter(containerInfo -> containerInfo.getState() != HddsProtos.LifeCycleState.DELETED) + .sorted(Comparator.comparingLong(ContainerInfo::getContainerID)).iterator(); + ContainerInfo scmContainerInfo = scmNonDeletedContainers.hasNext() ? + scmNonDeletedContainers.next() : null; + DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase()); + try (SeekableIterator omContainers = + reconContainerMetadataManager.getContainersIterator()) { + omContainers.seek(minContainerID); + ContainerMetadata containerMetadata = omContainers.hasNext() ? omContainers.next() : null; switch (dataFilter) { - case SCM: - List> notSCMContainers = - omContainers.entrySet().stream() - .filter( - containerMetadataEntry -> !scmNonDeletedContainers.contains( - containerMetadataEntry.getKey())) - .collect(Collectors.toList()); - - if (prevKey > 0) { - int index = 0; - while (index < notSCMContainers.size() && - notSCMContainers.get(index).getKey() <= prevKey) { - index++; - } - if (index < notSCMContainers.size()) { - notSCMContainers = notSCMContainers.subList(index, - Math.min(index + limit, notSCMContainers.size())); + List notSCMContainers = new ArrayList<>(); + while (containerMetadata != null && notSCMContainers.size() < limit) { + Long omContainerID = containerMetadata.getContainerID(); + Long scmContainerID = scmContainerInfo == null ? null : scmContainerInfo.getContainerID(); + if (omContainerID.equals(scmContainerID)) { + containerMetadata = omContainers.hasNext() ? omContainers.next() : null; + scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null; + } else if (scmContainerID == null || omContainerID.compareTo(scmContainerID) < 0) { + notSCMContainers.add(containerMetadata); + containerMetadata = omContainers.hasNext() ? omContainers.next() : null; } else { - notSCMContainers = Collections.emptyList(); + scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null; } - } else { - notSCMContainers = notSCMContainers.subList(0, - Math.min(limit, notSCMContainers.size())); } notSCMContainers.forEach(nonSCMContainer -> { ContainerDiscrepancyInfo containerDiscrepancyInfo = new ContainerDiscrepancyInfo(); - containerDiscrepancyInfo.setContainerID(nonSCMContainer.getKey()); + containerDiscrepancyInfo.setContainerID(nonSCMContainer.getContainerID()); containerDiscrepancyInfo.setNumberOfKeys( - nonSCMContainer.getValue().getNumberOfKeys()); + nonSCMContainer.getNumberOfKeys()); containerDiscrepancyInfo.setPipelines( - nonSCMContainer.getValue().getPipelines()); + nonSCMContainer.getPipelines()); containerDiscrepancyInfo.setExistsAt("OM"); containerDiscrepancyInfoList.add(containerDiscrepancyInfo); }); break; case OM: - List nonOMContainers = scmNonDeletedContainers.stream() - .filter(containerId -> !omContainers.containsKey(containerId)) - .collect(Collectors.toList()); - - if (prevKey > 0) { - int index = 0; - while (index < nonOMContainers.size() && - nonOMContainers.get(index) <= prevKey) { - index++; - } - if (index < nonOMContainers.size()) { - nonOMContainers = nonOMContainers.subList(index, - Math.min(index + limit, nonOMContainers.size())); + List nonOMContainers = new ArrayList<>(); + while (scmContainerInfo != null && nonOMContainers.size() < limit) { + Long omContainerID = containerMetadata == null ? null : containerMetadata.getContainerID(); + Long scmContainerID = scmContainerInfo.getContainerID(); + if (scmContainerID.equals(omContainerID)) { + scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null; + containerMetadata = omContainers.hasNext() ? omContainers.next() : null; + } else if (omContainerID == null || scmContainerID.compareTo(omContainerID) < 0) { + nonOMContainers.add(scmContainerInfo); + scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null; } else { - nonOMContainers = Collections.emptyList(); + //Seeking directly to SCM containerId sequential read is just wasteful here if there are too many values + // to be read in b/w omContainerID & scmContainerID since (omContainerId pipelines = new ArrayList<>(); - nonOMContainers.forEach(nonOMContainerId -> { - boolean containerExistsInScm = true; - ContainerDiscrepancyInfo containerDiscrepancyInfo = - new ContainerDiscrepancyInfo(); - containerDiscrepancyInfo.setContainerID(nonOMContainerId); + nonOMContainers.forEach(containerInfo -> { + ContainerDiscrepancyInfo containerDiscrepancyInfo = new ContainerDiscrepancyInfo(); + containerDiscrepancyInfo.setContainerID(containerInfo.getContainerID()); containerDiscrepancyInfo.setNumberOfKeys(0); PipelineID pipelineID = null; try { - pipelineID = containerManager.getContainer( - ContainerID.valueOf(nonOMContainerId)).getPipelineID(); + pipelineID = containerInfo.getPipelineID(); if (pipelineID != null) { pipelines.add(pipelineManager.getPipeline(pipelineID)); } - } catch (ContainerNotFoundException e) { - containerExistsInScm = false; - LOG.warn("Container {} not found in SCM: {}", nonOMContainerId, - e); } catch (PipelineNotFoundException e) { LOG.debug( "Pipeline not found for container: {} and pipelineId: {}", - nonOMContainerId, pipelineID, e); - } - // The container might have been deleted in SCM after the call to - // get the list of containers - if (containerExistsInScm) { - containerDiscrepancyInfo.setPipelines(pipelines); - containerDiscrepancyInfo.setExistsAt("SCM"); - containerDiscrepancyInfoList.add(containerDiscrepancyInfo); + containerInfo, pipelineID, e); } + containerDiscrepancyInfo.setPipelines(pipelines); + containerDiscrepancyInfo.setExistsAt("SCM"); + containerDiscrepancyInfoList.add(containerDiscrepancyInfo); }); break; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java index 24605c95a0e3..1400279d1244 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata; import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer; import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory; +import org.apache.hadoop.ozone.util.SeekableIterator; /** * The Recon Container DB Service interface. @@ -186,6 +187,9 @@ Map getKeyPrefixesForContainer( Map getContainers(int limit, long prevContainer) throws IOException; + + SeekableIterator getContainersIterator() throws IOException; + /** * Delete an entry in the container DB. * diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java index 27567333d9eb..17cb79398507 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java @@ -26,6 +26,7 @@ import jakarta.annotation.Nonnull; import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -38,6 +39,7 @@ import javax.inject.Singleton; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; @@ -54,6 +56,7 @@ import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory; import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistoryList; import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; +import org.apache.hadoop.ozone.util.SeekableIterator; import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao; import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats; import org.jooq.Configuration; @@ -431,53 +434,80 @@ public Map getContainers(int limit, long prevContainer) throws IOException { Map containers = new LinkedHashMap<>(); - try ( - TableIterator> - containerIterator = containerKeyTable.iterator()) { - ContainerKeyPrefix seekKey; - if (prevContainer > 0L) { - seekKey = ContainerKeyPrefix.get(prevContainer); - KeyValue seekKeyValue = containerIterator.seek(seekKey); - // Check if RocksDB was able to correctly seek to the given - // prevContainer containerId. If not, then return empty result - if (seekKeyValue != null && - seekKeyValue.getKey().getContainerId() != prevContainer) { - return containers; - } else { - // seek to the prevContainer+1 containerID to start scan - seekKey = ContainerKeyPrefix.get(prevContainer + 1); - containerIterator.seek(seekKey); - } + try (SeekableIterator containerIterator = getContainersIterator()) { + containerIterator.seek(prevContainer + 1); + while (containerIterator.hasNext() && ((limit < 0) || containers.size() < limit)) { + ContainerMetadata containerMetadata = containerIterator.next(); + containers.put(containerMetadata.getContainerID(), containerMetadata); } - while (containerIterator.hasNext()) { - KeyValue keyValue = - containerIterator.next(); - ContainerKeyPrefix containerKeyPrefix = keyValue.getKey(); - Long containerID = containerKeyPrefix.getContainerId(); - Integer numberOfKeys = keyValue.getValue(); - List pipelines = - getPipelines(containerKeyPrefix); - - // break the loop if limit has been reached - // and one more new entity needs to be added to the containers map - if (containers.size() == limit && - !containers.containsKey(containerID)) { - break; - } + } + return containers; + } + + @Override + public SeekableIterator getContainersIterator() + throws IOException { + return new ContainerMetadataIterator(); + } + + private class ContainerMetadataIterator implements SeekableIterator { + private TableIterator> containerIterator; + private KeyValue currentKey; + + ContainerMetadataIterator() + throws IOException { + containerIterator = containerKeyTable.iterator(); + currentKey = containerIterator.hasNext() ? containerIterator.next() : null; + } - // initialize containerMetadata with 0 as number of keys. - containers.computeIfAbsent(containerID, ContainerMetadata::new); - // increment number of keys for the containerID - ContainerMetadata containerMetadata = containers.get(containerID); - containerMetadata.setNumberOfKeys(containerMetadata.getNumberOfKeys() + - numberOfKeys); - containerMetadata.setPipelines(pipelines); - containers.put(containerID, containerMetadata); + @Override + public void seek(Long containerID) throws IOException { + ContainerKeyPrefix seekKey = ContainerKeyPrefix.get(containerID); + containerIterator.seek(seekKey); + currentKey = containerIterator.hasNext() ? containerIterator.next() : null; + } + + @Override + public void close() { + try { + containerIterator.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public boolean hasNext() { + return currentKey != null; + } + + @Override + public ContainerMetadata next() { + try { + if (currentKey == null) { + return null; + } + Map pipelines = new HashMap<>(); + ContainerMetadata containerMetadata = new ContainerMetadata(currentKey.getKey().getContainerId()); + do { + ContainerKeyPrefix containerKeyPrefix = this.currentKey.getKey(); + containerMetadata.setNumberOfKeys(containerMetadata.getNumberOfKeys() + 1); + getPipelines(containerKeyPrefix).forEach(pipeline -> { + pipelines.putIfAbsent(pipeline.getId(), pipeline); + }); + if (containerIterator.hasNext()) { + currentKey = containerIterator.next(); + } else { + currentKey = null; + } + } while (currentKey != null && + currentKey.getKey().getContainerId() == containerMetadata.getContainerID()); + containerMetadata.setPipelines(new ArrayList<>(pipelines.values())); + return containerMetadata; + } catch (IOException e) { + throw new UncheckedIOException(e); } } - return containers; } @Nonnull diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java index dbac3e5ee5ad..a3b5cef12338 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java @@ -365,8 +365,8 @@ public void testGetContainersWithPrevContainer() throws Exception { reconContainerMetadataManager.getContainers(-1, 0L); assertEquals(2, containerMap.size()); - assertEquals(3, containerMap.get(containerId).getNumberOfKeys()); - assertEquals(3, containerMap.get(nextContainerId).getNumberOfKeys()); + assertEquals(2, containerMap.get(containerId).getNumberOfKeys()); + assertEquals(1, containerMap.get(nextContainerId).getNumberOfKeys()); // test if limit works containerMap = reconContainerMetadataManager.getContainers(