Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.util;

import java.io.IOException;
import java.util.Iterator;

/**
* An {@link Iterator} that may hold resources until it is closed.
*/
public interface SeekableIterator<K, E> extends ClosableIterator<E> {
void seek(K position) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -50,7 +51,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
Expand Down Expand Up @@ -80,11 +80,13 @@
import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
import org.apache.hadoop.ozone.util.SeekableIterator;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
import org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Endpoint for querying keys that belong to a container.
*/
Expand Down Expand Up @@ -585,109 +587,87 @@ public Response getContainerMisMatchInsights(

List<ContainerDiscrepancyInfo> containerDiscrepancyInfoList =
new ArrayList<>();
try {
Map<Long, ContainerMetadata> omContainers =
reconContainerMetadataManager.getContainers(-1, -1);
List<Long> scmNonDeletedContainers =
containerManager.getContainers().stream()
.filter(containerInfo -> containerInfo.getState() !=
HddsProtos.LifeCycleState.DELETED)
.map(containerInfo -> containerInfo.getContainerID())
.collect(Collectors.toList());
DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase());

Long minContainerID = prevKey + 1;
Iterator<ContainerInfo> scmNonDeletedContainers =
containerManager.getContainers().stream()
.filter(containerInfo -> (containerInfo.getContainerID() >= minContainerID))
.filter(containerInfo -> containerInfo.getState() != HddsProtos.LifeCycleState.DELETED)
.sorted(Comparator.comparingLong(ContainerInfo::getContainerID)).iterator();
ContainerInfo scmContainerInfo = scmNonDeletedContainers.hasNext() ?
scmNonDeletedContainers.next() : null;
DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase());
try (SeekableIterator<Long, ContainerMetadata> omContainers =
reconContainerMetadataManager.getContainersIterator()) {
omContainers.seek(minContainerID);
ContainerMetadata containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
switch (dataFilter) {

case SCM:
List<Map.Entry<Long, ContainerMetadata>> notSCMContainers =
omContainers.entrySet().stream()
.filter(
containerMetadataEntry -> !scmNonDeletedContainers.contains(
containerMetadataEntry.getKey()))
.collect(Collectors.toList());

if (prevKey > 0) {
int index = 0;
while (index < notSCMContainers.size() &&
notSCMContainers.get(index).getKey() <= prevKey) {
index++;
}
if (index < notSCMContainers.size()) {
notSCMContainers = notSCMContainers.subList(index,
Math.min(index + limit, notSCMContainers.size()));
List<ContainerMetadata> notSCMContainers = new ArrayList<>();
while (containerMetadata != null && notSCMContainers.size() < limit) {
Long omContainerID = containerMetadata.getContainerID();
Long scmContainerID = scmContainerInfo == null ? null : scmContainerInfo.getContainerID();
if (omContainerID.equals(scmContainerID)) {
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
} else if (scmContainerID == null || omContainerID.compareTo(scmContainerID) < 0) {
notSCMContainers.add(containerMetadata);
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
} else {
notSCMContainers = Collections.emptyList();
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
}
} else {
notSCMContainers = notSCMContainers.subList(0,
Math.min(limit, notSCMContainers.size()));
}

notSCMContainers.forEach(nonSCMContainer -> {
ContainerDiscrepancyInfo containerDiscrepancyInfo =
new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(nonSCMContainer.getKey());
containerDiscrepancyInfo.setContainerID(nonSCMContainer.getContainerID());
containerDiscrepancyInfo.setNumberOfKeys(
nonSCMContainer.getValue().getNumberOfKeys());
nonSCMContainer.getNumberOfKeys());
containerDiscrepancyInfo.setPipelines(
nonSCMContainer.getValue().getPipelines());
nonSCMContainer.getPipelines());
containerDiscrepancyInfo.setExistsAt("OM");
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
});
break;

case OM:
List<Long> nonOMContainers = scmNonDeletedContainers.stream()
.filter(containerId -> !omContainers.containsKey(containerId))
.collect(Collectors.toList());

if (prevKey > 0) {
int index = 0;
while (index < nonOMContainers.size() &&
nonOMContainers.get(index) <= prevKey) {
index++;
}
if (index < nonOMContainers.size()) {
nonOMContainers = nonOMContainers.subList(index,
Math.min(index + limit, nonOMContainers.size()));
List<ContainerInfo> nonOMContainers = new ArrayList<>();
while (scmContainerInfo != null && nonOMContainers.size() < limit) {
Long omContainerID = containerMetadata == null ? null : containerMetadata.getContainerID();
Long scmContainerID = scmContainerInfo.getContainerID();
if (scmContainerID.equals(omContainerID)) {
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
} else if (omContainerID == null || scmContainerID.compareTo(omContainerID) < 0) {
nonOMContainers.add(scmContainerInfo);
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
} else {
nonOMContainers = Collections.emptyList();
//Seeking directly to SCM containerId sequential read is just wasteful here if there are too many values
// to be read in b/w omContainerID & scmContainerID since (omContainerId<scmContainerID)
omContainers.seek(scmContainerID);
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
}
} else {
nonOMContainers = nonOMContainers.subList(0,
Math.min(limit, nonOMContainers.size()));
}

List<Pipeline> pipelines = new ArrayList<>();
nonOMContainers.forEach(nonOMContainerId -> {
boolean containerExistsInScm = true;
ContainerDiscrepancyInfo containerDiscrepancyInfo =
new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(nonOMContainerId);
nonOMContainers.forEach(containerInfo -> {
ContainerDiscrepancyInfo containerDiscrepancyInfo = new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(containerInfo.getContainerID());
containerDiscrepancyInfo.setNumberOfKeys(0);
PipelineID pipelineID = null;
try {
pipelineID = containerManager.getContainer(
ContainerID.valueOf(nonOMContainerId)).getPipelineID();
pipelineID = containerInfo.getPipelineID();
if (pipelineID != null) {
pipelines.add(pipelineManager.getPipeline(pipelineID));
}
} catch (ContainerNotFoundException e) {
containerExistsInScm = false;
LOG.warn("Container {} not found in SCM: {}", nonOMContainerId,
e);
} catch (PipelineNotFoundException e) {
LOG.debug(
"Pipeline not found for container: {} and pipelineId: {}",
nonOMContainerId, pipelineID, e);
}
// The container might have been deleted in SCM after the call to
// get the list of containers
if (containerExistsInScm) {
containerDiscrepancyInfo.setPipelines(pipelines);
containerDiscrepancyInfo.setExistsAt("SCM");
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
containerInfo, pipelineID, e);
}
containerDiscrepancyInfo.setPipelines(pipelines);
containerDiscrepancyInfo.setExistsAt("SCM");
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
});
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory;
import org.apache.hadoop.ozone.util.SeekableIterator;

/**
* The Recon Container DB Service interface.
Expand Down Expand Up @@ -186,6 +187,9 @@ Map<ContainerKeyPrefix, Integer> getKeyPrefixesForContainer(
Map<Long, ContainerMetadata> getContainers(int limit, long prevContainer)
throws IOException;


SeekableIterator<Long, ContainerMetadata> getContainersIterator() throws IOException;

/**
* Delete an entry in the container DB.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -38,6 +39,7 @@
import javax.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
Expand All @@ -54,6 +56,7 @@
import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory;
import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistoryList;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.apache.hadoop.ozone.util.SeekableIterator;
import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats;
import org.jooq.Configuration;
Expand Down Expand Up @@ -431,53 +434,80 @@ public Map<Long, ContainerMetadata> getContainers(int limit,
long prevContainer)
throws IOException {
Map<Long, ContainerMetadata> containers = new LinkedHashMap<>();
try (
TableIterator<ContainerKeyPrefix,
? extends KeyValue<ContainerKeyPrefix, Integer>>
containerIterator = containerKeyTable.iterator()) {
ContainerKeyPrefix seekKey;
if (prevContainer > 0L) {
seekKey = ContainerKeyPrefix.get(prevContainer);
KeyValue<ContainerKeyPrefix,
Integer> seekKeyValue = containerIterator.seek(seekKey);
// Check if RocksDB was able to correctly seek to the given
// prevContainer containerId. If not, then return empty result
if (seekKeyValue != null &&
seekKeyValue.getKey().getContainerId() != prevContainer) {
return containers;
} else {
// seek to the prevContainer+1 containerID to start scan
seekKey = ContainerKeyPrefix.get(prevContainer + 1);
containerIterator.seek(seekKey);
}
try (SeekableIterator<Long, ContainerMetadata> containerIterator = getContainersIterator()) {
containerIterator.seek(prevContainer + 1);
while (containerIterator.hasNext() && ((limit < 0) || containers.size() < limit)) {
ContainerMetadata containerMetadata = containerIterator.next();
containers.put(containerMetadata.getContainerID(), containerMetadata);
}
while (containerIterator.hasNext()) {
KeyValue<ContainerKeyPrefix, Integer> keyValue =
containerIterator.next();
ContainerKeyPrefix containerKeyPrefix = keyValue.getKey();
Long containerID = containerKeyPrefix.getContainerId();
Integer numberOfKeys = keyValue.getValue();
List<Pipeline> pipelines =
getPipelines(containerKeyPrefix);

// break the loop if limit has been reached
// and one more new entity needs to be added to the containers map
if (containers.size() == limit &&
!containers.containsKey(containerID)) {
break;
}
}
return containers;
}

@Override
public SeekableIterator<Long, ContainerMetadata> getContainersIterator()
throws IOException {
return new ContainerMetadataIterator();
}

private class ContainerMetadataIterator implements SeekableIterator<Long, ContainerMetadata> {
private TableIterator<ContainerKeyPrefix, ? extends KeyValue<ContainerKeyPrefix, Integer>> containerIterator;
private KeyValue<ContainerKeyPrefix, Integer> currentKey;

ContainerMetadataIterator()
throws IOException {
containerIterator = containerKeyTable.iterator();
currentKey = containerIterator.hasNext() ? containerIterator.next() : null;
}

// initialize containerMetadata with 0 as number of keys.
containers.computeIfAbsent(containerID, ContainerMetadata::new);
// increment number of keys for the containerID
ContainerMetadata containerMetadata = containers.get(containerID);
containerMetadata.setNumberOfKeys(containerMetadata.getNumberOfKeys() +
numberOfKeys);
containerMetadata.setPipelines(pipelines);
containers.put(containerID, containerMetadata);
@Override
public void seek(Long containerID) throws IOException {
ContainerKeyPrefix seekKey = ContainerKeyPrefix.get(containerID);
containerIterator.seek(seekKey);
currentKey = containerIterator.hasNext() ? containerIterator.next() : null;
}

@Override
public void close() {
try {
containerIterator.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public boolean hasNext() {
return currentKey != null;
}

@Override
public ContainerMetadata next() {
try {
if (currentKey == null) {
return null;
}
Map<PipelineID, Pipeline> pipelines = new HashMap<>();
ContainerMetadata containerMetadata = new ContainerMetadata(currentKey.getKey().getContainerId());
do {
ContainerKeyPrefix containerKeyPrefix = this.currentKey.getKey();
containerMetadata.setNumberOfKeys(containerMetadata.getNumberOfKeys() + 1);
getPipelines(containerKeyPrefix).forEach(pipeline -> {
pipelines.putIfAbsent(pipeline.getId(), pipeline);
});
if (containerIterator.hasNext()) {
currentKey = containerIterator.next();
} else {
currentKey = null;
}
} while (currentKey != null &&
currentKey.getKey().getContainerId() == containerMetadata.getContainerID());
containerMetadata.setPipelines(new ArrayList<>(pipelines.values()));
return containerMetadata;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return containers;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ public void testGetContainersWithPrevContainer() throws Exception {
reconContainerMetadataManager.getContainers(-1, 0L);
assertEquals(2, containerMap.size());

assertEquals(3, containerMap.get(containerId).getNumberOfKeys());
assertEquals(3, containerMap.get(nextContainerId).getNumberOfKeys());
assertEquals(2, containerMap.get(containerId).getNumberOfKeys());
assertEquals(1, containerMap.get(nextContainerId).getNumberOfKeys());

// test if limit works
containerMap = reconContainerMetadataManager.getContainers(
Expand Down
Loading