Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.common.BlockGroup;
Expand Down Expand Up @@ -352,6 +353,6 @@ <KEY, VALUE> long countEstimatedRowsInTable(Table<KEY, VALUE> table)
* Return the existing upload keys which includes volumeName, bucketName,
* keyName.
*/
List<String> getMultipartUploadKeys(String volumeName,
Set<String> getMultipartUploadKeys(String volumeName,
String bucketName, String prefix) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1244,17 +1245,13 @@ public OmMultipartUploadList listMultipartUploads(String volumeName,
bucketName);
try {

List<String> multipartUploadKeys =
Set<String> multipartUploadKeys =
metadataManager
.getMultipartUploadKeys(volumeName, bucketName, prefix);

List<OmMultipartUpload> collect = multipartUploadKeys.stream()
.map(OmMultipartUpload::from)
.peek(upload -> {
String dbKey = metadataManager
.getOzoneKey(upload.getVolumeName(),
upload.getBucketName(),
upload.getKeyName());
try {
Table<String, OmMultipartKeyInfo> keyInfoTable =
metadataManager.getMultipartInfoTable();
Expand All @@ -1271,7 +1268,8 @@ public OmMultipartUploadList listMultipartUploads(String volumeName,
} catch (IOException e) {
LOG.warn(
"Open key entry for multipart upload record can be read {}",
dbKey);
metadataManager.getOzoneKey(upload.getVolumeName(),
upload.getBucketName(), upload.getKeyName()));
}
})
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,21 +930,43 @@ public <KEY, VALUE> long countEstimatedRowsInTable(Table<KEY, VALUE> table)
}

@Override
public List<String> getMultipartUploadKeys(
public Set<String> getMultipartUploadKeys(
String volumeName, String bucketName, String prefix) throws IOException {
List<String> response = new ArrayList<>();

TableIterator<String, ? extends KeyValue<String, OmMultipartKeyInfo>>
iterator = getMultipartInfoTable().iterator();
Set<String> response = new TreeSet<>();
Set<String> aborted = new TreeSet<>();

Iterator<Map.Entry<CacheKey<String>, CacheValue<OmMultipartKeyInfo>>>
cacheIterator = getMultipartInfoTable().cacheIterator();

String prefixKey =
OmMultipartUpload.getDbKey(volumeName, bucketName, prefix);

// First iterate all the entries in cache.
while (cacheIterator.hasNext()) {
Map.Entry<CacheKey<String>, CacheValue<OmMultipartKeyInfo>> cacheEntry =
cacheIterator.next();
if (cacheEntry.getKey().getCacheKey().startsWith(prefixKey)) {
// Check if it is marked for delete, due to abort mpu
if (cacheEntry.getValue().getCacheValue() != null) {
response.add(cacheEntry.getKey().getCacheKey());
} else {
aborted.add(cacheEntry.getKey().getCacheKey());
}
}
}

TableIterator<String, ? extends KeyValue<String, OmMultipartKeyInfo>>
iterator = getMultipartInfoTable().iterator();
iterator.seek(prefixKey);

while (iterator.hasNext()) {
KeyValue<String, OmMultipartKeyInfo> entry = iterator.next();
if (entry.getKey().startsWith(prefixKey)) {
response.add(entry.getKey());
// If it is marked for abort, skip it.
if (!aborted.contains(entry.getKey())) {
response.add(entry.getKey());
}
} else {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,35 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import com.google.common.base.Optional;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs.Builder;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
import org.apache.hadoop.test.GenericTestUtils;

import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -124,6 +134,92 @@ public void listMultipartUploads() throws IOException {
uploads.get(1).getCreationTime().compareTo(startDate) > 0);
}

@Test
public void listMultipartUploadsWithFewEntriesInCache() throws IOException {
String volume = UUID.randomUUID().toString();
String bucket = UUID.randomUUID().toString();

//GIVEN
createBucket(metadataManager, volume, bucket);
createBucket(metadataManager, volume, bucket);


// Add few to cache and few to DB.
addinitMultipartUploadToCache(volume, bucket, "dir/key1");

initMultipartUpload(keyManager, volume, bucket, "dir/key2");

addinitMultipartUploadToCache(volume, bucket, "dir/key3");

initMultipartUpload(keyManager, volume, bucket, "dir/key4");

//WHEN
OmMultipartUploadList omMultipartUploadList =
keyManager.listMultipartUploads(volume, bucket, "");

//THEN
List<OmMultipartUpload> uploads = omMultipartUploadList.getUploads();
Assert.assertEquals(4, uploads.size());
Assert.assertEquals("dir/key1", uploads.get(0).getKeyName());
Assert.assertEquals("dir/key2", uploads.get(1).getKeyName());
Assert.assertEquals("dir/key3", uploads.get(2).getKeyName());
Assert.assertEquals("dir/key4", uploads.get(3).getKeyName());

// Add few more to test prefix.

// Same way add few to cache and few to DB.
addinitMultipartUploadToCache(volume, bucket, "dir/ozonekey1");

initMultipartUpload(keyManager, volume, bucket, "dir/ozonekey2");

OmMultipartInfo omMultipartInfo3 =addinitMultipartUploadToCache(volume,
bucket, "dir/ozonekey3");

OmMultipartInfo omMultipartInfo4 = initMultipartUpload(keyManager,
volume, bucket, "dir/ozonekey4");

omMultipartUploadList =
keyManager.listMultipartUploads(volume, bucket, "dir/ozone");

//THEN
uploads = omMultipartUploadList.getUploads();
Assert.assertEquals(4, uploads.size());
Assert.assertEquals("dir/ozonekey1", uploads.get(0).getKeyName());
Assert.assertEquals("dir/ozonekey2", uploads.get(1).getKeyName());
Assert.assertEquals("dir/ozonekey3", uploads.get(2).getKeyName());
Assert.assertEquals("dir/ozonekey4", uploads.get(3).getKeyName());

// Abort multipart upload for key in DB.
abortMultipart(volume, bucket, "dir/ozonekey4",
omMultipartInfo4.getUploadID());

// Now list.
omMultipartUploadList =
keyManager.listMultipartUploads(volume, bucket, "dir/ozone");

//THEN
uploads = omMultipartUploadList.getUploads();
Assert.assertEquals(3, uploads.size());
Assert.assertEquals("dir/ozonekey1", uploads.get(0).getKeyName());
Assert.assertEquals("dir/ozonekey2", uploads.get(1).getKeyName());
Assert.assertEquals("dir/ozonekey3", uploads.get(2).getKeyName());

// abort multipart upload for key in cache.
abortMultipart(volume, bucket, "dir/ozonekey3",
omMultipartInfo3.getUploadID());

// Now list.
omMultipartUploadList =
keyManager.listMultipartUploads(volume, bucket, "dir/ozone");

//THEN
uploads = omMultipartUploadList.getUploads();
Assert.assertEquals(2, uploads.size());
Assert.assertEquals("dir/ozonekey1", uploads.get(0).getKeyName());
Assert.assertEquals("dir/ozonekey2", uploads.get(1).getKeyName());

}

@Test
public void listMultipartUploadsWithPrefix() throws IOException {

Expand Down Expand Up @@ -177,4 +273,30 @@ private OmMultipartInfo initMultipartUpload(KeyManagerImpl omtest,
.build();
return omtest.initiateMultipartUpload(key1);
}

private OmMultipartInfo addinitMultipartUploadToCache(
String volume, String bucket, String key) {
Map<Integer, OzoneManagerProtocolProtos.PartKeyInfo > partKeyInfoMap =
new HashMap<>();
String uploadID = UUID.randomUUID().toString();
OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(
uploadID, Time.now(), ReplicationType.RATIS,
ReplicationFactor.THREE, partKeyInfoMap);
metadataManager.getMultipartInfoTable().addCacheEntry(
new CacheKey<>(metadataManager.getMultipartKey(volume, bucket, key,
uploadID)), new CacheValue<>(Optional.of(multipartKeyInfo),
RandomUtils.nextInt()));
return new OmMultipartInfo(volume, bucket, key, uploadID);
}

private void abortMultipart(
String volume, String bucket, String key, String uploadID) {
Map<Integer, OzoneManagerProtocolProtos.PartKeyInfo > partKeyInfoMap =
new HashMap<>();
metadataManager.getMultipartInfoTable().addCacheEntry(
new CacheKey<>(metadataManager.getMultipartKey(volume, bucket, key,
uploadID)), new CacheValue<>(Optional.absent(),
RandomUtils.nextInt()));
}

}