Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
Expand Down Expand Up @@ -134,6 +136,7 @@
import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY;
import static org.apache.hadoop.util.Time.monotonicNow;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1520,7 +1523,8 @@ public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
if (args.getLatestVersionLocation()) {
slimLocationVersion(keyInfoList.toArray(new OmKeyInfo[0]));
}
refreshPipeline(keyInfoList);

refreshPipelineFromCache(keyInfoList);

if (args.getSortDatanodes()) {
sortDatanodes(clientAddress, keyInfoList.toArray(new OmKeyInfo[0]));
Expand Down Expand Up @@ -1652,7 +1656,7 @@ private List<OzoneFileStatus> sortPipelineInfo(
// refreshPipeline flag check has been removed as part of
// https://issues.apache.org/jira/browse/HDDS-3658.
// Please refer this jira for more details.
refreshPipeline(keyInfoList);
refreshPipelineFromCache(keyInfoList);

if (omKeyArgs.getSortDatanodes()) {
sortDatanodes(clientAddress, keyInfoList.toArray(new OmKeyInfo[0]));
Expand Down Expand Up @@ -1914,18 +1918,39 @@ public OmKeyInfo getKeyInfo(OmKeyArgs args, String clientAddress)
return value;
}

private void refreshPipelineFromCache(Iterable<OmKeyInfo> keyInfos)
throws IOException {
Set<Long> containerIds = new HashSet<>();
for (OmKeyInfo keyInfo : keyInfos) {
extractContainerIDs(keyInfo).forEach(containerIds::add);
}

// List API never force cache refresh. If a client detects a block
// location is outdated, it'll call getKeyInfo with cacheRefresh=true
// to request cache refresh on individual container.
Map<Long, Pipeline> containerLocations =
scmClient.getContainerLocations(containerIds, false);

for (OmKeyInfo keyInfo : keyInfos) {
setUpdatedContainerLocation(keyInfo, containerLocations);
}
}

protected void refreshPipelineFromCache(OmKeyInfo keyInfo,
boolean forceRefresh)
throws IOException {
Set<Long> containerIds = keyInfo.getKeyLocationVersions().stream()
.flatMap(v -> v.getLocationList().stream())
.map(BlockLocationInfo::getContainerID)
Set<Long> containerIds = extractContainerIDs(keyInfo)
.collect(Collectors.toSet());

metrics.setForceContainerCacheRefresh(forceRefresh);
Map<Long, Pipeline> containerLocations =
scmClient.getContainerLocations(containerIds, forceRefresh);

setUpdatedContainerLocation(keyInfo, containerLocations);
}

private void setUpdatedContainerLocation(OmKeyInfo keyInfo,
Map<Long, Pipeline> containerLocations) {
for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) {
for (List<OmKeyLocationInfo> omKeyLocationInfoList :
key.getLocationLists()) {
Expand All @@ -1940,4 +1965,11 @@ protected void refreshPipelineFromCache(OmKeyInfo keyInfo,
}
}
}

@NotNull
private Stream<Long> extractContainerIDs(OmKeyInfo keyInfo) {
return keyInfo.getKeyLocationVersions().stream()
.flatMap(v -> v.getLocationList().stream())
.map(BlockLocationInfo::getContainerID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
import com.google.common.cache.LoadingCache;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.update.client.SCMUpdateServiceGrpcClient;
import org.apache.hadoop.util.CacheMetrics;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -49,7 +50,6 @@ public class ScmClient {
private final StorageContainerLocationProtocol containerClient;
private final LoadingCache<Long, Pipeline> containerLocationCache;
private final CacheMetrics containerCacheMetrics;
private SCMUpdateServiceGrpcClient updateServiceGrpcClient;

ScmClient(ScmBlockLocationProtocol blockClient,
StorageContainerLocationProtocol containerClient,
Expand Down Expand Up @@ -104,15 +104,6 @@ public StorageContainerLocationProtocol getContainerClient() {
return this.containerClient;
}

public void setUpdateServiceGrpcClient(
SCMUpdateServiceGrpcClient updateClient) {
this.updateServiceGrpcClient = updateClient;
}

public SCMUpdateServiceGrpcClient getUpdateServiceGrpcClient() {
return updateServiceGrpcClient;
}

public Map<Long, Pipeline> getContainerLocations(Iterable<Long> containerIds,
boolean forceRefresh)
throws IOException {
Expand All @@ -123,6 +114,18 @@ public Map<Long, Pipeline> getContainerLocations(Iterable<Long> containerIds,
return containerLocationCache.getAll(containerIds);
} catch (ExecutionException e) {
return handleCacheExecutionException(e);
} catch (InvalidCacheLoadException e) {
// this is thrown when a container is not found from SCM.
// In this case, return available, instead of propagating the
// exception to client code.
Map<Long, Pipeline> result = new HashMap<>();
for (Long containerId : containerIds) {
Pipeline p = containerLocationCache.getIfPresent(containerId);
if (p != null) {
result.put(containerId, p);
}
}
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -84,6 +85,7 @@
import static java.util.Collections.singletonList;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -574,7 +576,7 @@ public void listStatus() throws Exception {
.map(DatanodeDetails::getUuidString)
.collect(toList());

List<Long> containerIDs = new ArrayList<>();
Set<Long> containerIDs = new HashSet<>();
List<ContainerWithPipeline> containersWithPipeline = new ArrayList<>();
for (long i = 1; i <= 10; i++) {
final OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder()
Expand Down Expand Up @@ -622,6 +624,12 @@ public void listStatus() throws Exception {
Assert.assertEquals(10, fileStatusList.size());
verify(containerClient).getContainerWithPipelineBatch(containerIDs);
verify(blockClient).sortDatanodes(nodes, client);

// call list status the second time, and verify no more calls to
// SCM.
keyManager.listStatus(builder.build(), false,
null, Long.MAX_VALUE, client);
verify(containerClient, times(1)).getContainerWithPipelineBatch(anySet());
}

@Test
Expand Down