Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
/**
* List of in-flight MPU uploads.
*/
public class OmMultipartUploadList {
public final class OmMultipartUploadList {

private List<OmMultipartUpload> uploads;
private String nextKeyMarker;
private String nextUploadIdMarker;
private boolean isTruncated;

public OmMultipartUploadList(
List<OmMultipartUpload> uploads,
String nextKeyMarker,
String nextUploadIdMarker,
boolean isTruncated) {
this.uploads = uploads;
this.nextKeyMarker = nextKeyMarker;
this.nextUploadIdMarker = nextUploadIdMarker;
this.isTruncated = isTruncated;
private OmMultipartUploadList(Builder builder) {
this.uploads = builder.uploads;
this.nextKeyMarker = builder.nextKeyMarker;
this.nextUploadIdMarker = builder.nextUploadIdMarker;
this.isTruncated = builder.isTruncated;
}

public static Builder newBuilder() {
return new Builder();
}

public List<OmMultipartUpload> getUploads() {
Expand All @@ -61,4 +61,40 @@ public boolean isTruncated() {
return isTruncated;
}

/**
* Builder class for OmMultipartUploadList.
*/
public static class Builder {
private List<OmMultipartUpload> uploads;
private String nextKeyMarker = "";
private String nextUploadIdMarker = "";
private boolean isTruncated;

public Builder() {
}

public Builder setUploads(List<OmMultipartUpload> uploads) {
this.uploads = uploads;
return this;
}

public Builder setNextKeyMarker(String nextKeyMarker) {
this.nextKeyMarker = nextKeyMarker;
return this;
}

public Builder setNextUploadIdMarker(String nextUploadIdMarker) {
this.nextUploadIdMarker = nextUploadIdMarker;
return this;
}

public Builder setIsTruncated(boolean isTruncated) {
this.isTruncated = isTruncated;
return this;
}

public OmMultipartUploadList build() {
return new OmMultipartUploadList(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1842,10 +1842,12 @@ public OmMultipartUploadList listMultipartUploads(String volumeName,
))
.collect(Collectors.toList());

OmMultipartUploadList response = new OmMultipartUploadList(uploadList,
listMultipartUploadsResponse.getNextKeyMarker(),
listMultipartUploadsResponse.getNextUploadIdMarker(),
listMultipartUploadsResponse.getIsTruncated());
OmMultipartUploadList response = OmMultipartUploadList.newBuilder()
.setUploads(uploadList)
.setNextKeyMarker(listMultipartUploadsResponse.getNextKeyMarker())
.setNextUploadIdMarker(listMultipartUploadsResponse.getNextUploadIdMarker())
.setIsTruncated(listMultipartUploadsResponse.getIsTruncated())
.build();

return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.MultipartUploadKeys;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
Expand Down Expand Up @@ -494,8 +494,11 @@ <KEY, VALUE> long countEstimatedRowsInTable(Table<KEY, VALUE> table)
/**
* Return the existing upload keys which includes volumeName, bucketName,
* keyName.
* @param noPagination if true, returns all keys; if false, applies pagination
* @return When paginated, returns up to maxUploads + 1 entries, where the
* extra entry is used to determine the next page markers
*/
MultipartUploadKeys getMultipartUploadKeys(String volumeName,
List<OmMultipartUpload> getMultipartUploadKeys(String volumeName,
String bucketName, String prefix, String keyMarker, String uploadIdMarker, int maxUploads,
boolean noPagination) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -132,7 +131,6 @@
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
import org.apache.hadoop.ozone.om.helpers.MultipartUploadKeys;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
Expand Down Expand Up @@ -826,38 +824,25 @@ public OmMultipartUploadList listMultipartUploads(String volumeName,
metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
bucketName);
try {

MultipartUploadKeys multipartUploadKeys = metadataManager
List<OmMultipartUpload> multipartUploadKeys = metadataManager
.getMultipartUploadKeys(volumeName, bucketName, prefix, keyMarker, uploadIdMarker, maxUploads,
!withPagination);
OmMultipartUploadList.Builder resultBuilder = OmMultipartUploadList.newBuilder();

List<OmMultipartUpload> collect = multipartUploadKeys.getKeys().stream()
.map(OmMultipartUpload::from)
.peek(upload -> {
try {
Table<String, OmMultipartKeyInfo> keyInfoTable =
metadataManager.getMultipartInfoTable();

OmMultipartKeyInfo multipartKeyInfo =
keyInfoTable.get(upload.getDbKey());

upload.setCreationTime(
Instant.ofEpochMilli(multipartKeyInfo.getCreationTime()));
upload.setReplicationConfig(
multipartKeyInfo.getReplicationConfig());
} catch (IOException e) {
LOG.warn(
"Open key entry for multipart upload record can't be read {}",
metadataManager.getOzoneKey(upload.getVolumeName(),
upload.getBucketName(), upload.getKeyName()));
}
})
.collect(Collectors.toList());
if (withPagination && multipartUploadKeys.size() == maxUploads + 1) {
int lastIndex = multipartUploadKeys.size() - 1;
OmMultipartUpload lastUpload = multipartUploadKeys.get(lastIndex);
resultBuilder.setNextKeyMarker(lastUpload.getKeyName())
.setNextUploadIdMarker(lastUpload.getUploadId())
.setIsTruncated(true);

// remove next upload from the list
multipartUploadKeys.remove(lastIndex);
}

return new OmMultipartUploadList(collect,
multipartUploadKeys.getNextKeyMarker(),
multipartUploadKeys.getNextUploadIdMarker(),
multipartUploadKeys.isTruncated());
return resultBuilder
.setUploads(multipartUploadKeys)
.build();

} catch (IOException ex) {
LOG.error("List Multipart Uploads Failed: volume: " + volumeName +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -56,9 +57,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -89,7 +89,6 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.ListKeysResult;
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.MultipartUploadKeys;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
Expand Down Expand Up @@ -1936,12 +1935,11 @@ public <KEY, VALUE> long countEstimatedRowsInTable(Table<KEY, VALUE> table)
}

@Override
public MultipartUploadKeys getMultipartUploadKeys(
public List<OmMultipartUpload> getMultipartUploadKeys(
String volumeName, String bucketName, String prefix, String keyMarker,
String uploadIdMarker, int maxUploads, boolean noPagination) throws IOException {

MultipartUploadKeys.Builder resultBuilder = MultipartUploadKeys.newBuilder();
SortedSet<String> responseKeys = new TreeSet<>();
SortedMap<String, OmMultipartKeyInfo> responseKeys = new TreeMap<>();
Set<String> aborted = new HashSet<>();

String prefixKey =
Expand All @@ -1964,9 +1962,10 @@ public MultipartUploadKeys getMultipartUploadKeys(
String cacheKey = cacheEntry.getKey().getCacheKey();
if (cacheKey.startsWith(prefixKey)) {
// Check if it is marked for delete, due to abort mpu
if (cacheEntry.getValue().getCacheValue() != null &&
OmMultipartKeyInfo multipartKeyInfo = cacheEntry.getValue().getCacheValue();
if (multipartKeyInfo != null &&
cacheKey.compareTo(seekKey) >= 0) {
responseKeys.add(cacheKey);
responseKeys.put(cacheKey, multipartKeyInfo);
} else {
aborted.add(cacheKey);
}
Expand All @@ -1984,35 +1983,26 @@ public MultipartUploadKeys getMultipartUploadKeys(
KeyValue<String, OmMultipartKeyInfo> entry = iterator.next();
// If it is marked for abort, skip it.
if (!aborted.contains(entry.getKey())) {
responseKeys.add(entry.getKey());
responseKeys.put(entry.getKey(), entry.getValue());
dbKeysCount++;
}
}
}

if (noPagination) {
resultBuilder.setKeys(responseKeys);
return resultBuilder.build();
}
List<OmMultipartUpload> result = new ArrayList<>(responseKeys.size());

for (Map.Entry<String, OmMultipartKeyInfo> entry : responseKeys.entrySet()) {
OmMultipartUpload multipartUpload = OmMultipartUpload.from(entry.getKey());

multipartUpload.setCreationTime(
Instant.ofEpochMilli(entry.getValue().getCreationTime()));
multipartUpload.setReplicationConfig(
entry.getValue().getReplicationConfig());

String lastKey = responseKeys.stream()
.skip(maxUploads)
.findFirst()
.orElse(null);
if (lastKey != null) {
// implies the keyset size is greater than maxUploads
OmMultipartUpload lastKeyMultipartUpload = OmMultipartUpload.from(lastKey);
resultBuilder.setNextKeyMarker(lastKeyMultipartUpload.getKeyName())
.setNextUploadIdMarker(lastKeyMultipartUpload.getUploadId())
.setIsTruncated(true);

// keep the [0, maxUploads] keys
responseKeys = responseKeys.subSet(responseKeys.first(), lastKey);
result.add(multipartUpload);
}

return resultBuilder
.setKeys(responseKeys)
.build();
return noPagination || result.size() <= maxUploads ? result : result.subList(0, maxUploads + 1);
}

@Override
Expand Down
Loading
Loading