Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ public OzoneFileStatus getOzoneFileStatus(String volumeName,
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setRefreshPipeline(true)
.build();
return ozoneManagerClient.getFileStatus(keyArgs);
}
Expand Down Expand Up @@ -1081,6 +1082,7 @@ public List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setRefreshPipeline(true)
.build();
return ozoneManagerClient
.listStatus(keyArgs, recursive, startKey, numEntries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ public OzoneFileStatus(OmKeyInfo key, long blockSize, boolean isDirectory) {
keyInfo = key;
}

public OzoneFileStatus(FileStatus status) throws IOException {
public OzoneFileStatus(FileStatus status, OmKeyInfo key) throws IOException {
super(status);
keyInfo = key;
}

// Use this constructor only for directories
Expand All @@ -54,13 +55,18 @@ public OzoneFileStatus(String keyName) {
}

public OzoneFileStatusProto getProtobuf() throws IOException {
return OzoneFileStatusProto.newBuilder().setStatus(PBHelper.convert(this))
.build();
OzoneFileStatusProto.Builder builder = OzoneFileStatusProto.newBuilder()
.setStatus(PBHelper.convert(this));
if (keyInfo != null) {
builder.setKeyInfo(keyInfo.getProtobuf());
}
return builder.build();
}

public static OzoneFileStatus getFromProtobuf(OzoneFileStatusProto response)
throws IOException {
return new OzoneFileStatus(PBHelper.convert(response.getStatus()));
return new OzoneFileStatus(PBHelper.convert(response.getStatus()),
OmKeyInfo.getFromProtobuf(response.getKeyInfo()));
}

public static Path getPath(String keyName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ message RepeatedKeyInfo {

message OzoneFileStatusProto {
required hadoop.fs.FileStatusProto status = 1;
optional KeyInfo keyInfo = 2;
}

message GetFileStatusRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
import java.util.Collection;
import java.util.List;

import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.OzoneBucket;
Expand All @@ -51,6 +55,7 @@
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
import org.junit.After;
import org.junit.Assert;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -312,6 +317,54 @@ public void testOzoneManagerFileSystemInterface() throws IOException {
assertEquals(omStatus.getPath().getName(), o3fs.pathToKey(path));
}

@Test
public void testOzoneManagerLocatedFileStatus() throws IOException {
String data = RandomStringUtils.randomAlphanumeric(20);
String filePath = RandomStringUtils.randomAlphanumeric(5);
Path path = createPath("/" + filePath);
try (FSDataOutputStream stream = fs.create(path)) {
stream.writeBytes(data);
}
FileStatus status = fs.getFileStatus(path);
assertTrue(status instanceof LocatedFileStatus);
LocatedFileStatus locatedFileStatus = (LocatedFileStatus) status;
assertTrue(locatedFileStatus.getBlockLocations().length >= 1);

for (BlockLocation blockLocation : locatedFileStatus.getBlockLocations()) {
assertTrue(blockLocation.getNames().length >= 1);
assertTrue(blockLocation.getHosts().length >= 1);
}
}

@Test
public void testOzoneManagerLocatedFileStatusBlockOffsetsWithMultiBlockFile()
throws Exception {
// naive assumption: MiniOzoneCluster will not have larger than ~1GB
// block size when running this test.
int blockSize = (int) fs.getConf().getStorageSize(
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT,
StorageUnit.BYTES
);
String data = RandomStringUtils.randomAlphanumeric(2*blockSize+837);
String filePath = RandomStringUtils.randomAlphanumeric(5);
Path path = createPath("/" + filePath);
try (FSDataOutputStream stream = fs.create(path)) {
stream.writeBytes(data);
}
FileStatus status = fs.getFileStatus(path);
assertTrue(status instanceof LocatedFileStatus);
LocatedFileStatus locatedFileStatus = (LocatedFileStatus) status;
BlockLocation[] blockLocations = locatedFileStatus.getBlockLocations();

assertEquals(0, blockLocations[0].getOffset());
assertEquals(blockSize, blockLocations[1].getOffset());
assertEquals(2*blockSize, blockLocations[2].getOffset());
assertEquals(blockSize, blockLocations[0].getLength());
assertEquals(blockSize, blockLocations[1].getLength());
assertEquals(837, blockLocations[2].getLength());
}

@Test
public void testPathToKey() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
Expand Down Expand Up @@ -686,31 +687,35 @@ public OmKeyInfo lookupKey(OmKeyArgs args, String clientAddress)
*/
@VisibleForTesting
protected void refreshPipeline(OmKeyInfo value) throws IOException {
Map<Long, ContainerWithPipeline> containerWithPipelineMap = new HashMap<>();
for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) {
for (OmKeyLocationInfo k : key.getLocationList()) {
// TODO: fix Some tests that may not initialize container client
// The production should always have containerClient initialized.
if (scmClient.getContainerClient() != null) {
try {
if (!containerWithPipelineMap.containsKey(k.getContainerID())) {
ContainerWithPipeline containerWithPipeline = scmClient
.getContainerClient()
.getContainerWithPipeline(k.getContainerID());
containerWithPipelineMap.put(k.getContainerID(),
containerWithPipeline);
if (value != null &&
CollectionUtils.isNotEmpty(value.getKeyLocationVersions())) {
Map<Long, ContainerWithPipeline> containerWithPipelineMap =
new HashMap<>();
for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) {
for (OmKeyLocationInfo k : key.getLocationList()) {
// TODO: fix Some tests that may not initialize container client
// The production should always have containerClient initialized.
if (scmClient.getContainerClient() != null) {
try {
if (!containerWithPipelineMap.containsKey(k.getContainerID())) {
ContainerWithPipeline containerWithPipeline = scmClient
.getContainerClient()
.getContainerWithPipeline(k.getContainerID());
containerWithPipelineMap.put(k.getContainerID(),
containerWithPipeline);
}
} catch (IOException ioEx) {
LOG.debug("Get containerPipeline failed for volume:{} bucket:{} "
+ "key:{}", value.getVolumeName(), value.getBucketName(),
value.getKeyName(), ioEx);
throw new OMException(ioEx.getMessage(),
SCM_GET_PIPELINE_EXCEPTION);
}
ContainerWithPipeline cp =
containerWithPipelineMap.get(k.getContainerID());
if (!cp.getPipeline().equals(k.getPipeline())) {
k.setPipeline(cp.getPipeline());
}
} catch (IOException ioEx) {
LOG.debug("Get containerPipeline failed for volume:{} bucket:{} " +
"key:{}", value.getVolumeName(), value.getBucketName(),
value.getKeyName(), ioEx);
throw new OMException(ioEx.getMessage(),
SCM_GET_PIPELINE_EXCEPTION);
}
ContainerWithPipeline cp =
containerWithPipelineMap.get(k.getContainerID());
if (!cp.getPipeline().equals(k.getPipeline())) {
k.setPipeline(cp.getPipeline());
}
}
}
Expand Down Expand Up @@ -1687,6 +1692,9 @@ public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
volumeName, bucketName, keyName);
OmKeyInfo fileKeyInfo = metadataManager.getKeyTable().get(fileKeyBytes);
if (fileKeyInfo != null) {
if (args.getRefreshPipeline()) {
refreshPipeline(fileKeyInfo);
}
// this is a file
return new OzoneFileStatus(fileKeyInfo, scmBlockSize, false);
}
Expand Down Expand Up @@ -2024,6 +2032,9 @@ public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
for (Map.Entry<String, OzoneFileStatus> entry : cacheKeyMap.entrySet()) {
// No need to check if a key is deleted or not here, this is handled
// when adding entries to cacheKeyMap from DB.
if (args.getRefreshPipeline()) {
refreshPipeline(entry.getValue().getKeyInfo());
}
fileStatusList.add(entry.getValue());
countEntries++;
if (countEntries >= numEntries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ private GetFileStatusResponse getOzoneFileStatus(
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setRefreshPipeline(true)
.build();

GetFileStatusResponse.Builder rb = GetFileStatusResponse.newBuilder();
Expand Down Expand Up @@ -588,6 +589,7 @@ private ListStatusResponse listStatus(
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setRefreshPipeline(true)
.build();
List<OzoneFileStatus> statuses =
impl.listStatus(omKeyArgs, request.getRecursive(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OmUtils;
Expand All @@ -46,6 +49,9 @@
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -74,6 +80,7 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
private ReplicationType replicationType;
private ReplicationFactor replicationFactor;
private boolean securityEnabled;
private int configuredDnPort;

/**
* Create new OzoneClientAdapter implementation.
Expand Down Expand Up @@ -168,6 +175,9 @@ public BasicOzoneClientAdapterImpl(String omHost, int omPort,
this.bucket = volume.getBucket(bucketStr);
this.replicationType = ReplicationType.valueOf(replicationTypeConf);
this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
this.configuredDnPort = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
Expand Down Expand Up @@ -440,7 +450,64 @@ private FileStatusAdapter toFileStatusAdapter(OzoneFileStatus status) {
status.getPermission().toShort(),
status.getOwner(),
status.getGroup(),
status.getPath()
status.getPath(),
getBlockLocations(status)
);
}

/**
* Helper method to get List of BlockLocation from OM Key info.
* @param fileStatus Ozone key file status.
* @return list of block locations.
*/
private BlockLocation[] getBlockLocations(OzoneFileStatus fileStatus) {

if (fileStatus == null) {
return new BlockLocation[0];
}

OmKeyInfo keyInfo = fileStatus.getKeyInfo();
if (keyInfo == null || CollectionUtils.isEmpty(
keyInfo.getKeyLocationVersions())) {
return new BlockLocation[0];
}
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups =
keyInfo.getKeyLocationVersions();
if (CollectionUtils.isEmpty(omKeyLocationInfoGroups)) {
return new BlockLocation[0];
}

OmKeyLocationInfoGroup omKeyLocationInfoGroup =
keyInfo.getLatestVersionLocations();
BlockLocation[] blockLocations = new BlockLocation[
omKeyLocationInfoGroup.getBlocksLatestVersionOnly().size()];

int i = 0;
long offsetOfBlockInFile = 0L;
for (OmKeyLocationInfo omKeyLocationInfo :
omKeyLocationInfoGroup.getBlocksLatestVersionOnly()) {
List<String> hostList = new ArrayList<>();
List<String> nameList = new ArrayList<>();
omKeyLocationInfo.getPipeline().getNodes()
.forEach(dn -> {
hostList.add(dn.getHostName());
int port = dn.getPort(
DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) {
port = configuredDnPort;
}
nameList.add(dn.getHostName() + ":" + port);
});

String[] hosts = hostList.toArray(new String[hostList.size()]);
String[] names = nameList.toArray(new String[nameList.size()]);
BlockLocation blockLocation = new BlockLocation(
names, hosts, offsetOfBlockInFile,
omKeyLocationInfo.getLength());
offsetOfBlockInFile += omKeyLocationInfo.getLength();
blockLocations[i++] = blockLocation;
}
return blockLocations;
}

}
Loading