Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hadoop-tools/hadoop-aliyun/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.hadoop.util.Progressable;

import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;

import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
Expand Down Expand Up @@ -271,14 +270,15 @@ public FileStatus getFileStatus(Path path) throws IOException {
meta = store.getObjectMetadata(key);
}
if (meta == null) {
ObjectListing listing = store.listObjects(key, 1, null, false);
OSSListRequest listRequest = store.createListObjectsRequest(key,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks we can do more optimization here for versioning/non-versioning bucket; But the change here should be good enough for now.

maxKeys, null, null, false);
OSSListResult listing = store.listObjects(listRequest);
do {
if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username);
} else if (listing.isTruncated()) {
listing = store.listObjects(key, 1000, listing.getNextMarker(),
false);
listing = store.continueListObjects(listRequest, listing);
} else {
throw new FileNotFoundException(
path + ": No such file or directory!");
Expand Down Expand Up @@ -416,7 +416,9 @@ public FileStatus[] listStatus(Path path) throws IOException {
LOG.debug("listStatus: doing listObjects for directory " + key);
}

ObjectListing objects = store.listObjects(key, maxKeys, null, false);
OSSListRequest listRequest = store.createListObjectsRequest(key,
maxKeys, null, null, false);
OSSListResult objects = store.listObjects(listRequest);
while (true) {
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String objKey = objectSummary.getKey();
Expand Down Expand Up @@ -456,8 +458,7 @@ public FileStatus[] listStatus(Path path) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch");
}
String nextMarker = objects.getNextMarker();
objects = store.listObjects(key, maxKeys, nextMarker, false);
objects = store.continueListObjects(listRequest, objects);
} else {
break;
}
Expand Down Expand Up @@ -520,7 +521,7 @@ private RemoteIterator<LocatedFileStatus> innerList(final Path f,
locations);
} else {
return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
acceptor, recursive ? null : "/");
acceptor, recursive);
}
}

Expand Down Expand Up @@ -707,7 +708,9 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
ExecutorService executorService = MoreExecutors.listeningDecorator(
new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
maxConcurrentCopyTasksPerDir, true));
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
OSSListRequest listRequest = store.createListObjectsRequest(srcKey,
maxKeys, null, null, true);
OSSListResult objects = store.listObjects(listRequest);
// Copy files from src folder to dst
int copiesToFinish = 0;
while (true) {
Expand All @@ -729,8 +732,7 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
}
}
if (objects.isTruncated()) {
String nextMarker = objects.getNextMarker();
objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
objects = store.continueListObjects(listRequest, objects);
} else {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.ListObjectsV2Request;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.PutObjectResult;
Expand Down Expand Up @@ -90,6 +90,7 @@ public class AliyunOSSFileSystemStore {
private long uploadPartSize;
private int maxKeys;
private String serverSideEncryptionAlgorithm;
private boolean useListV1;

public void initialize(URI uri, Configuration conf, String user,
FileSystem.Statistics stat) throws IOException {
Expand Down Expand Up @@ -170,6 +171,12 @@ public void initialize(URI uri, Configuration conf, String user,
}

maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
if (listVersion < 1 || listVersion > 2) {
LOG.warn("Configured fs.oss.list.version {} is invalid, forcing " +
"version 2", listVersion);
}
useListV1 = (listVersion == 1);
}

/**
Expand Down Expand Up @@ -231,22 +238,23 @@ public void deleteObjects(List<String> keysToDelete) throws IOException {
* @throws IOException if failed to delete directory.
*/
public void deleteDirs(String key) throws IOException {
key = AliyunOSSUtils.maybeAddTrailingSlash(key);
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(key);
listRequest.setDelimiter(null);
listRequest.setMaxKeys(maxKeys);

OSSListRequest listRequest = createListObjectsRequest(key,
maxKeys, null, null, true);
while (true) {
ObjectListing objects = ossClient.listObjects(listRequest);
OSSListResult objects = listObjects(listRequest);
statistics.incrementReadOps(1);
List<String> keysToDelete = new ArrayList<String>();
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
keysToDelete.add(objectSummary.getKey());
}
deleteObjects(keysToDelete);
if (objects.isTruncated()) {
listRequest.setMarker(objects.getNextMarker());
if (objects.isV1()) {
listRequest.getV1().setMarker(objects.getV1().getNextMarker());
} else {
listRequest.getV2().setContinuationToken(
objects.getV2().getNextContinuationToken());
}
} else {
break;
}
Expand Down Expand Up @@ -418,25 +426,76 @@ public void uploadObject(String key, File file) throws IOException {
/**
* list objects.
*
* @param listRequest list request.
* @return a list of matches.
*/
public OSSListResult listObjects(OSSListRequest listRequest) {
OSSListResult listResult;
if (listRequest.isV1()) {
listResult = OSSListResult.v1(
ossClient.listObjects(listRequest.getV1()));
} else {
listResult = OSSListResult.v2(
ossClient.listObjectsV2(listRequest.getV2()));
}
statistics.incrementReadOps(1);
return listResult;
}

/**
* continue to list objects depends on previous list result.
*
* @param listRequest list request.
* @param preListResult previous list result.
* @return a list of matches.
*/
public OSSListResult continueListObjects(OSSListRequest listRequest,
OSSListResult preListResult) {
OSSListResult listResult;
if (listRequest.isV1()) {
listRequest.getV1().setMarker(preListResult.getV1().getNextMarker());
listResult = OSSListResult.v1(
ossClient.listObjects(listRequest.getV1()));
} else {
listRequest.getV2().setContinuationToken(
preListResult.getV2().getNextContinuationToken());
listResult = OSSListResult.v2(
ossClient.listObjectsV2(listRequest.getV2()));
}
statistics.incrementReadOps(1);
return listResult;
}

/**
* create list objects request.
*
* @param prefix prefix.
* @param maxListingLength max no. of entries
* @param marker last key in any previous search.
* @param continuationToken list from a specific point.
* @param recursive whether to list directory recursively.
* @return a list of matches.
*/
public ObjectListing listObjects(String prefix, int maxListingLength,
String marker, boolean recursive) {
protected OSSListRequest createListObjectsRequest(String prefix,
int maxListingLength, String marker,
String continuationToken, boolean recursive) {
String delimiter = recursive ? null : "/";
prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(prefix);
listRequest.setDelimiter(delimiter);
listRequest.setMaxKeys(maxListingLength);
listRequest.setMarker(marker);

ObjectListing listing = ossClient.listObjects(listRequest);
statistics.incrementReadOps(1);
return listing;
if (useListV1) {
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(prefix);
listRequest.setDelimiter(delimiter);
listRequest.setMaxKeys(maxListingLength);
listRequest.setMarker(marker);
return OSSListRequest.v1(listRequest);
} else {
ListObjectsV2Request listV2Request = new ListObjectsV2Request(bucketName);
listV2Request.setPrefix(prefix);
listV2Request.setDelimiter(delimiter);
listV2Request.setMaxKeys(maxListingLength);
listV2Request.setContinuationToken(continuationToken);
return OSSListRequest.v2(listV2Request);
}
}

/**
Expand Down Expand Up @@ -478,21 +537,7 @@ public void close() {
* @throws IOException if failed to clean up objects.
*/
public void purge(String prefix) throws IOException {
String key;
try {
ObjectListing objects = listObjects(prefix, maxKeys, null, true);
for (OSSObjectSummary object : objects.getObjectSummaries()) {
key = object.getKey();
ossClient.deleteObject(bucketName, key);
statistics.incrementWriteOps(1);
}

for (String dir: objects.getCommonPrefixes()) {
deleteDirs(dir);
}
} catch (OSSException | ClientException e) {
LOG.error("Failed to purge " + prefix);
}
deleteDirs(prefix);
}

public RemoteIterator<LocatedFileStatus> singleStatusRemoteIterator(
Expand Down Expand Up @@ -520,12 +565,12 @@ public LocatedFileStatus next() throws IOException {

public RemoteIterator<LocatedFileStatus> createLocatedFileStatusIterator(
final String prefix, final int maxListingLength, FileSystem fs,
PathFilter filter, FileStatusAcceptor acceptor, String delimiter) {
PathFilter filter, FileStatusAcceptor acceptor, boolean recursive) {
return new RemoteIterator<LocatedFileStatus>() {
private String nextMarker = null;
private boolean firstListing = true;
private boolean meetEnd = false;
private ListIterator<FileStatus> batchIterator;
private OSSListRequest listRequest = null;

@Override
public boolean hasNext() throws IOException {
Expand All @@ -550,15 +595,24 @@ public LocatedFileStatus next() throws IOException {
}

private boolean requestNextBatch() {
while (!meetEnd) {
if (continueListStatus()) {
return true;
}
}

return false;
}

private boolean continueListStatus() {
if (meetEnd) {
return false;
}
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(AliyunOSSUtils.maybeAddTrailingSlash(prefix));
listRequest.setMaxKeys(maxListingLength);
listRequest.setMarker(nextMarker);
listRequest.setDelimiter(delimiter);
ObjectListing listing = ossClient.listObjects(listRequest);
if (listRequest == null) {
listRequest = createListObjectsRequest(prefix,
maxListingLength, null, null, recursive);
}
OSSListResult listing = listObjects(listRequest);
List<FileStatus> stats = new ArrayList<>(
listing.getObjectSummaries().size() +
listing.getCommonPrefixes().size());
Expand All @@ -584,7 +638,12 @@ private boolean requestNextBatch() {

batchIterator = stats.listIterator();
if (listing.isTruncated()) {
nextMarker = listing.getNextMarker();
if (listing.isV1()) {
listRequest.getV1().setMarker(listing.getV1().getNextMarker());
} else {
listRequest.getV2().setContinuationToken(
listing.getV2().getNextContinuationToken());
}
} else {
meetEnd = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,8 @@ private Constants() {
public static final String UPLOAD_ACTIVE_BLOCKS_KEY =
"fs.oss.upload.active.blocks";
public static final int UPLOAD_ACTIVE_BLOCKS_DEFAULT = 4;

public static final String LIST_VERSION = "fs.oss.list.version";

public static final int DEFAULT_LIST_VERSION = 2;
}
Loading